From 88278fdb3cfd6f49f9f0cb8f6c2add8ae9d44f7f Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Wed, 14 Jun 2023 21:42:25 +0800 Subject: [PATCH 01/10] [INLONG-8244][Agent]thread leaks after job finished --- .../plugin/sources/reader/file/TriggerFileReader.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java index b747ae35953..380c918a5fb 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java @@ -17,13 +17,12 @@ package org.apache.inlong.agent.plugin.sources.reader.file; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.constant.JobConstants; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.Reader; -import org.apache.commons.lang3.StringUtils; - import javax.validation.constraints.NotNull; public class TriggerFileReader implements Reader { @@ -31,6 +30,8 @@ public class TriggerFileReader implements Reader { @NotNull private String triggerId; + public volatile boolean finished = false; + @Override public Message read() { try { @@ -44,7 +45,7 @@ public Message read() { @Override public boolean isFinished() { - return false; + return finished; } @Override @@ -69,7 +70,7 @@ public String getSnapshot() { @Override public void finishRead() { - + finished = true; } @Override From 02fd7471dd2904a7c13f84b70adcc5bd9108c72e Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Wed, 14 Jun 2023 21:47:36 +0800 Subject: [PATCH 02/10] [INLONG-8244][Agent]thread leaks after job finished --- .../agent/plugin/sources/reader/file/TriggerFileReader.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java index 380c918a5fb..627195f16c1 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java @@ -17,12 +17,13 @@ package org.apache.inlong.agent.plugin.sources.reader.file; -import org.apache.commons.lang3.StringUtils; import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.constant.JobConstants; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.Reader; +import org.apache.commons.lang3.StringUtils; + import javax.validation.constraints.NotNull; public class TriggerFileReader implements Reader { From 797e83d49d0da871e3d07bb950947407596aee7c Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 15 Jun 2023 11:08:16 +0800 Subject: [PATCH 03/10] [INLONG-8244][Agent]thread leaks after job finished --- .../agent/plugin/sources/reader/file/TriggerFileReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java index 627195f16c1..f87de408cc9 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java @@ -31,7 +31,7 @@ public class TriggerFileReader implements Reader { @NotNull private String triggerId; - public volatile boolean finished = false; + private volatile boolean finished = false; @Override public Message read() { From 189ba4a601caaafee85899987828bb598cab72c1 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 15 Jun 2023 14:16:39 +0800 Subject: [PATCH 04/10] [INLONG-8251][agent] add global memory limit for file collect add global memory limit for file collect, for three module: reader, channel and writer. the way to limit is semaphore. --- .../agent/constant/CommonConstants.java | 5 +- .../agent/constant/FetcherConstants.java | 12 ++ .../agent/message/PackProxyMessage.java | 11 +- .../inlong/agent/core/AgentManager.java | 14 +- .../inlong/agent/core/HeartbeatManager.java | 12 ++ .../inlong/agent/core/task/MemoryManager.java | 115 ++++++++++ ...itionManager.java => PositionManager.java} | 91 ++++---- .../inlong/agent/core/task/TaskManager.java | 1 - .../inlong/agent/plugin/sinks/KafkaSink.java | 6 +- .../inlong/agent/plugin/sinks/ProxySink.java | 30 +-- .../inlong/agent/plugin/sinks/PulsarSink.java | 6 +- .../agent/plugin/sinks/SenderManager.java | 81 ++----- .../agent/plugin/sources/TextFileSource.java | 5 - .../reader/file/FileReaderOperator.java | 203 +++++++++++------- .../sources/reader/file/MonitorTextFile.java | 25 ++- .../apache/inlong/agent/plugin/MiniAgent.java | 14 +- .../inlong/agent/plugin/TestFileAgent.java | 2 +- 17 files changed, 401 insertions(+), 232 deletions(-) create mode 100644 inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java rename inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/{TaskPositionManager.java => PositionManager.java} (65%) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java index 81fbc7b202b..1cd9cbf0290 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java @@ -64,9 +64,6 @@ public class CommonConstants { // max size of single batch in bytes, default is 800KB. public static final int DEFAULT_PROXY_PACKAGE_MAX_SIZE = 800000; - public static final String PROXY_MESSAGE_SEMAPHORE = "proxy.semaphore"; - public static final int DEFAULT_PROXY_MESSAGE_SEMAPHORE = 20000; - public static final String PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = "proxy.group.queue.maxNumber"; public static final int DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER = 10000; @@ -74,7 +71,7 @@ public class CommonConstants { public static final int DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS = 4 * 1000; public static final String PROXY_BATCH_FLUSH_INTERVAL = "proxy.batch.flush.interval"; - public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 1000; + public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 100; public static final String PROXY_SENDER_MAX_TIMEOUT = "proxy.sender.maxTimeout"; // max timeout in seconds. diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java index f45242bc1eb..6b48ff1a25b 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java @@ -67,4 +67,16 @@ public class FetcherConstants { public static final String AGENT_MANAGER_AUTH_SECRET_ID = "agent.manager.auth.secretId"; public static final String AGENT_MANAGER_AUTH_SECRET_KEY = "agent.manager.auth.secretKey"; + + public static final String AGENT_GLOBAL_READER_SOURCE_PERMIT = "agent.global.reader.source.permit"; + public static final int DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT = 16 * 1000 * 1000; + + public static final String AGENT_GLOBAL_READER_QUEUE_PERMIT = "agent.global.reader.queue.permit"; + public static final int DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT = 16 * 1000 * 1000; + + public static final String AGENT_GLOBAL_CHANNEL_PERMIT = "agent.global.channel.permit"; + public static final int DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT = 16 * 1000 * 1000; + + public static final String AGENT_GLOBAL_WRITER_PERMIT = "agent.global.writer.permit"; + public static final int DEFAULT_AGENT_GLOBAL_WRITER_PERMIT = 96 * 1000 * 1000; } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java index 15d558748e2..8e257b39729 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java @@ -145,19 +145,18 @@ public BatchProxyMessage fetchBatch() { // pre check message size ProxyMessage peekMessage = messageQueue.peek(); int peekMessageLength = peekMessage.getBody().length; + if (resultBatchSize + peekMessageLength > maxPackSize) { + break; + } + ProxyMessage message = messageQueue.remove(); + int bodySize = message.getBody().length; if (peekMessageLength > maxPackSize) { LOGGER.warn("message size is {}, greater than max pack size {}, drop it!", peekMessage.getBody().length, maxPackSize); - int bodySize = peekMessage.getBody().length; queueSize.addAndGet(-bodySize); messageQueue.remove(); break; } - if (resultBatchSize + peekMessageLength > maxPackSize) { - break; - } - ProxyMessage message = messageQueue.remove(); - int bodySize = message.getBody().length; resultBatchSize += bodySize; // decrease queue size. queueSize.addAndGet(-bodySize); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java index d7b921f355d..11f9ce793e2 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java @@ -25,8 +25,8 @@ import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.core.conf.ConfigJetty; import org.apache.inlong.agent.core.job.JobManager; +import org.apache.inlong.agent.core.task.PositionManager; import org.apache.inlong.agent.core.task.TaskManager; -import org.apache.inlong.agent.core.task.TaskPositionManager; import org.apache.inlong.agent.core.trigger.TriggerManager; import org.apache.inlong.agent.db.CommandDb; import org.apache.inlong.agent.db.Db; @@ -57,7 +57,7 @@ public class AgentManager extends AbstractDaemon { private final JobManager jobManager; private final TaskManager taskManager; private final TriggerManager triggerManager; - private final TaskPositionManager taskPositionManager; + private final PositionManager positionManager; private final HeartbeatManager heartbeatManager; private final ProfileFetcher fetcher; private final AgentConfiguration conf; @@ -82,7 +82,7 @@ public AgentManager() { taskManager = new TaskManager(this); fetcher = initFetcher(this); heartbeatManager = HeartbeatManager.getInstance(this); - taskPositionManager = TaskPositionManager.getInstance(this); + positionManager = PositionManager.getInstance(this); // need to be an option. if (conf.getBoolean( AgentConstants.AGENT_ENABLE_HTTP, AgentConstants.DEFAULT_AGENT_ENABLE_HTTP)) { @@ -174,8 +174,8 @@ public TriggerManager getTriggerManager() { return triggerManager; } - public TaskPositionManager getTaskPositionManager() { - return taskPositionManager; + public PositionManager getTaskPositionManager() { + return positionManager; } public TaskManager getTaskManager() { @@ -206,7 +206,7 @@ public void start() throws Exception { LOGGER.info("starting heartbeat manager"); heartbeatManager.start(); LOGGER.info("starting task position manager"); - taskPositionManager.start(); + positionManager.start(); LOGGER.info("starting read job from local"); // read job profiles from local List profileList = localProfile.readFromLocal(); @@ -249,7 +249,7 @@ public void stop() throws Exception { jobManager.stop(); taskManager.stop(); heartbeatManager.stop(); - taskPositionManager.stop(); + positionManager.stop(); agentConfMonitor.shutdown(); this.db.close(); } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java index 9214de04e87..b945969d6f8 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java @@ -23,6 +23,7 @@ import org.apache.inlong.agent.core.job.Job; import org.apache.inlong.agent.core.job.JobManager; import org.apache.inlong.agent.core.job.JobWrapper; +import org.apache.inlong.agent.core.task.MemoryManager; import org.apache.inlong.agent.state.State; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.HttpManager; @@ -69,6 +70,7 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbeatManager { private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatManager.class); + public static final int PRINT_MEMORY_PERMIT_INTERVAL_SECOND = 60; private static HeartbeatManager heartbeatManager = null; private final JobManager jobmanager; private final AgentConfiguration conf; @@ -122,6 +124,16 @@ public static HeartbeatManager getInstance() { public void start() throws Exception { submitWorker(snapshotReportThread()); submitWorker(heartbeatReportThread()); + submitWorker(printMemoryPermitThread()); + } + + private Runnable printMemoryPermitThread() { + return () -> { + while (isRunnable()) { + MemoryManager.getInstance().printAll(); + AgentUtils.silenceSleepInSeconds(PRINT_MEMORY_PERMIT_INTERVAL_SECOND); + } + }; } private Runnable snapshotReportThread() { diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java new file mode 100644 index 00000000000..d67a15fb5ab --- /dev/null +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.core.task; + +import org.apache.inlong.agent.conf.AgentConfiguration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; + +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT; + +/** + * used to limit global memory to avoid oom + */ +public class MemoryManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(MemoryManager.class); + private static volatile MemoryManager memoryManager = null; + private final AgentConfiguration conf; + private ConcurrentHashMap semaphoreMap = new ConcurrentHashMap<>(); + + private MemoryManager() { + this.conf = AgentConfiguration.getAgentConf(); + Semaphore semaphore = null; + semaphore = new Semaphore( + conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT)); + semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore); + + semaphore = new Semaphore( + conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT)); + semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore); + + semaphore = new Semaphore( + conf.getInt(AGENT_GLOBAL_CHANNEL_PERMIT, DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT)); + semaphoreMap.put(AGENT_GLOBAL_CHANNEL_PERMIT, semaphore); + + semaphore = new Semaphore( + conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, DEFAULT_AGENT_GLOBAL_WRITER_PERMIT)); + semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore); + } + + /** + * manager singleton + */ + public static MemoryManager getInstance() { + if (memoryManager == null) { + synchronized (MemoryManager.class) { + if (memoryManager == null) { + memoryManager = new MemoryManager(); + } + } + } + return memoryManager; + } + + public boolean tryAcquire(String semaphoreName, int permit) { + Semaphore semaphore = semaphoreMap.get(semaphoreName); + if (semaphore == null) { + LOGGER.error("tryAcquire {} not exist"); + return false; + } + return semaphore.tryAcquire(permit); + } + + public void release(String semaphoreName, int permit) { + Semaphore semaphore = semaphoreMap.get(semaphoreName); + if (semaphore == null) { + LOGGER.error("release {} not exist"); + return; + } + semaphore.release(permit); + } + + public void printDetail(String semaphoreName) { + Semaphore semaphore = semaphoreMap.get(semaphoreName); + if (semaphore == null) { + LOGGER.error("printDetail {} not exist"); + return; + } + LOGGER.info("permit left {} wait {} {}", semaphore.availablePermits(), semaphore.getQueueLength(), + semaphoreName); + } + + public void printAll() { + printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT); + printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT); + printDetail(AGENT_GLOBAL_CHANNEL_PERMIT); + printDetail(AGENT_GLOBAL_WRITER_PERMIT); + } +} diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java similarity index 65% rename from inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java rename to inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java index 85f94710d38..48f6525d019 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java @@ -39,16 +39,16 @@ * where key is task read file name and value is task sink position * note that this class is generated */ -public class TaskPositionManager extends AbstractDaemon { +public class PositionManager extends AbstractDaemon { - private static final Logger LOGGER = LoggerFactory.getLogger(TaskPositionManager.class); - private static volatile TaskPositionManager taskPositionManager = null; + private static final Logger LOGGER = LoggerFactory.getLogger(PositionManager.class); + private static volatile PositionManager positionManager = null; private final AgentManager agentManager; private final JobProfileDb jobConfDb; private final AgentConfiguration conf; private ConcurrentHashMap> jobTaskPositionMap; - private TaskPositionManager(AgentManager agentManager) { + private PositionManager(AgentManager agentManager) { this.conf = AgentConfiguration.getAgentConf(); this.agentManager = agentManager; this.jobConfDb = agentManager.getJobManager().getJobConfDb(); @@ -58,25 +58,25 @@ private TaskPositionManager(AgentManager agentManager) { /** * task position manager singleton, can only generated by agent manager */ - public static TaskPositionManager getInstance(AgentManager agentManager) { - if (taskPositionManager == null) { - synchronized (TaskPositionManager.class) { - if (taskPositionManager == null) { - taskPositionManager = new TaskPositionManager(agentManager); + public static PositionManager getInstance(AgentManager agentManager) { + if (positionManager == null) { + synchronized (PositionManager.class) { + if (positionManager == null) { + positionManager = new PositionManager(agentManager); } } } - return taskPositionManager; + return positionManager; } /** * get taskPositionManager singleton */ - public static TaskPositionManager getInstance() { - if (taskPositionManager == null) { + public static PositionManager getInstance() { + if (positionManager == null) { throw new RuntimeException("task position manager has not been initialized by agentManager"); } - return taskPositionManager; + return positionManager; } @Override @@ -87,30 +87,34 @@ public void start() throws Exception { private Runnable taskPositionFlushThread() { return () -> { while (isRunnable()) { - try { - // check pending jobs and try to submit again. - for (String jobId : jobTaskPositionMap.keySet()) { - JobProfile jobProfile = jobConfDb.getJobById(jobId); - if (jobProfile == null) { - LOGGER.warn("jobProfile {} cannot be found in db, " - + "might be deleted by standalone mode, now delete job position in memory", jobId); - deleteJobPosition(jobId); - continue; - } - flushJobProfile(jobId, jobProfile); - } - } catch (Throwable ex) { - LOGGER.error("error caught", ex); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); - } finally { - int flushTime = conf.getInt(AGENT_HEARTBEAT_INTERVAL, - DEFAULT_AGENT_FETCHER_INTERVAL); - AgentUtils.silenceSleepInSeconds(flushTime); - } + doFlush(); } }; } + private void doFlush() { + try { + // check pending jobs and try to submit again. + for (String jobId : jobTaskPositionMap.keySet()) { + JobProfile jobProfile = jobConfDb.getJobById(jobId); + if (jobProfile == null) { + LOGGER.warn("jobProfile {} cannot be found in db, " + + "might be deleted by standalone mode, now delete job position in memory", jobId); + deleteJobPosition(jobId); + continue; + } + flushJobProfile(jobId, jobProfile); + } + } catch (Throwable ex) { + LOGGER.error("error caught", ex); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); + } finally { + int flushTime = conf.getInt(AGENT_HEARTBEAT_INTERVAL, + DEFAULT_AGENT_FETCHER_INTERVAL); + AgentUtils.silenceSleepInSeconds(flushTime); + } + } + private void flushJobProfile(String jobId, JobProfile jobProfile) { jobTaskPositionMap.get(jobId).forEach( (fileName, position) -> jobProfile.setLong(fileName + POSITION_SUFFIX, position)); @@ -134,17 +138,22 @@ public void stop() throws Exception { /** * update job sink position * - * @param newPosition + * @param size add this size to beforePosition */ - public void updateSinkPosition(String jobInstanceId, String sourcePath, long newPosition) { - LOGGER.info("updateSinkPosition jobInstanceId {} sourcePath {} newPosition {}", jobInstanceId, sourcePath, - newPosition); + public void updateSinkPosition(String jobInstanceId, String sourcePath, long size, boolean reset) { ConcurrentHashMap positionTemp = new ConcurrentHashMap<>(); - ConcurrentHashMap lastPosition = jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp); - if (lastPosition == null) { - positionTemp.put(sourcePath, newPosition); + ConcurrentHashMap position = jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp); + if (position == null) { + JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId); + positionTemp.put(sourcePath, jobProfile.getLong(sourcePath + POSITION_SUFFIX, 0)); + position = positionTemp; + } + + if (!reset) { + Long beforePosition = position.getOrDefault(sourcePath, 0L); + position.put(sourcePath, beforePosition + size); } else { - lastPosition.put(sourcePath, newPosition); + position.put(sourcePath, size); } } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java index 1aa03e7d943..344cafba2eb 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java @@ -81,7 +81,6 @@ public TaskManager(AgentManager agentManager) { this.taskMetrics = new AgentMetricItemSet(this.getClass().getSimpleName()); this.dimensions = new HashMap<>(); this.dimensions.put(KEY_COMPONENT_NAME, this.getClass().getSimpleName()); - MetricRegister.unregister(taskMetrics); MetricRegister.register(taskMetrics); tasks = new ConcurrentHashMap<>(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java index 22f96d8eaf5..1e9029e3349 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java @@ -20,7 +20,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.core.task.TaskPositionManager; +import org.apache.inlong.agent.core.task.PositionManager; import org.apache.inlong.agent.message.BatchProxyMessage; import org.apache.inlong.agent.message.EndMessage; import org.apache.inlong.agent.message.PackProxyMessage; @@ -76,7 +76,7 @@ public class KafkaSink extends AbstractSink { private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new AgentThreadFactory("KafkaSink")); private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); - private TaskPositionManager taskPositionManager; + private PositionManager taskPositionManager; private volatile boolean shutdown = false; private List mqClusterInfos; @@ -92,7 +92,7 @@ public class KafkaSink extends AbstractSink { @Override public void init(JobProfile jobConf) { super.init(jobConf); - taskPositionManager = TaskPositionManager.getInstance(); + taskPositionManager = PositionManager.getInstance(); int sendQueueSize = agentConf.getInt(KAFKA_SINK_SEND_QUEUE_SIZE, DEFAULT_SEND_QUEUE_SIZE); kafkaSendQueue = new LinkedBlockingQueue<>(sendQueueSize); producerNum = agentConf.getInt(KAFKA_SINK_PRODUCER_NUM, DEFAULT_PRODUCER_NUM); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java index 248796a9ecd..7bf723e9650 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java @@ -20,6 +20,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.core.task.MemoryManager; import org.apache.inlong.agent.message.BatchProxyMessage; import org.apache.inlong.agent.message.EndMessage; import org.apache.inlong.agent.message.PackProxyMessage; @@ -43,6 +44,8 @@ import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; /** * sink message data to inlong-dataproxy @@ -68,12 +71,6 @@ public void write(Message message) { if (message == null) { return; } - // if the message size is greater than max pack size,should drop it. - if (message.getBody().length > maxPackSize) { - LOGGER.warn("message size is {}, greater than max pack size {}, drop it!", - message.getBody().length, maxPackSize); - return; - } boolean suc = false; while (!suc) { suc = putInCache(message); @@ -98,6 +95,13 @@ private boolean putInCache(Message message) { } AtomicBoolean suc = new AtomicBoolean(false); ProxyMessage proxyMessage = new ProxyMessage(message); + boolean writerPermitSuc = MemoryManager.getInstance() + .tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, message.getBody().length); + if (!writerPermitSuc) { + LOGGER.warn("writer tryAcquire failed"); + MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT); + return false; + } // add proxy message to cache. cache.compute(proxyMessage.getBatchKey(), (s, packProxyMessage) -> { @@ -111,11 +115,11 @@ private boolean putInCache(Message message) { return packProxyMessage; }); if (suc.get()) { - // semaphore should be acquired only when the message was put in cache successfully - senderManager.acquireSemaphore(1); + MemoryManager.getInstance().release(AGENT_GLOBAL_CHANNEL_PERMIT, message.getBody().length); // increment the count of successful sinks sinkMetric.sinkSuccessCount.incrementAndGet(); } else { + MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, message.getBody().length); // increment the count of failed sinks sinkMetric.sinkFailCount.incrementAndGet(); } @@ -147,7 +151,7 @@ private void extractStreamFromMessage(Message message, byte[] fieldSplitter) { */ private Runnable flushCache() { return () -> { - LOGGER.info("start flush cache thread for {} ProxySink", inlongGroupId); + LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName); while (!shutdown) { try { cache.forEach((batchKey, packProxyMessage) -> { @@ -159,7 +163,6 @@ private Runnable flushCache() { batchProxyMessage.getDataList().size(), jobInstanceId, sourceName, batchProxyMessage.getDataTime()); } - }); } catch (Exception ex) { LOGGER.error("error caught", ex); @@ -169,6 +172,7 @@ private Runnable flushCache() { AgentUtils.silenceSleepInMs(batchFlushInterval); } } + LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName); }; } @@ -182,7 +186,6 @@ public void init(JobProfile jobConf) { executorService.execute(flushCache()); senderManager = new SenderManager(jobConf, inlongGroupId, sourceName); try { - senderManager.addMessageSender(); senderManager.Start(); } catch (Throwable ex) { LOGGER.error("error while init sender for group id {}", inlongGroupId); @@ -193,14 +196,15 @@ public void init(JobProfile jobConf) { @Override public void destroy() { - LOGGER.info("destroy sink which sink from source name {}", sourceName); + LOGGER.info("destroy sink source name {}", sourceName); while (!sinkFinish()) { - LOGGER.info("job {} wait until cache all flushed to proxy", jobInstanceId); + LOGGER.info("sourceName {} wait until cache all flushed to proxy", sourceName); AgentUtils.silenceSleepInMs(batchFlushInterval); } shutdown = true; executorService.shutdown(); senderManager.Stop(); + LOGGER.info("destroy sink source name {} end", sourceName); } /** diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java index d2801e4b6d8..348cd1c4b75 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java @@ -20,7 +20,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.core.task.TaskPositionManager; +import org.apache.inlong.agent.core.task.PositionManager; import org.apache.inlong.agent.message.BatchProxyMessage; import org.apache.inlong.agent.message.EndMessage; import org.apache.inlong.agent.message.PackProxyMessage; @@ -91,7 +91,7 @@ public class PulsarSink extends AbstractSink { private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new AgentThreadFactory("PulsarSink")); private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); - private TaskPositionManager taskPositionManager; + private PositionManager positionManager; private volatile boolean shutdown = false; private List mqClusterInfos; private String topic; @@ -119,7 +119,7 @@ public class PulsarSink extends AbstractSink { @Override public void init(JobProfile jobConf) { super.init(jobConf); - taskPositionManager = TaskPositionManager.getInstance(); + positionManager = PositionManager.getInstance(); // agentConf sendQueueSize = agentConf.getInt(PULSAR_SINK_SEND_QUEUE_SIZE, DEFAULT_SEND_QUEUE_SIZE); sendQueueSemaphore = new Semaphore(sendQueueSize); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java index 615f4700bd6..3447eb00588 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java @@ -21,7 +21,8 @@ import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.constant.CommonConstants; -import org.apache.inlong.agent.core.task.TaskPositionManager; +import org.apache.inlong.agent.core.task.MemoryManager; +import org.apache.inlong.agent.core.task.PositionManager; import org.apache.inlong.agent.message.BatchProxyMessage; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; @@ -40,14 +41,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -56,6 +53,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST; @@ -73,18 +71,16 @@ public class SenderManager { private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class); private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance(); - private static final AtomicInteger SENDER_INDEX = new AtomicInteger(0); + private final AtomicInteger SENDER_INDEX = new AtomicInteger(0); // cache for group and sender list, share the map cross agent lifecycle. - private static final ConcurrentHashMap> SENDER_MAP = - new ConcurrentHashMap<>(); + private DefaultMessageSender sender; private LinkedBlockingQueue resendQueue; private final ExecutorService resendExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new AgentThreadFactory("SendManager-Resend")); // sharing worker threads between sender client // in case of thread abusing. - private static final ThreadFactory SHARED_FACTORY = new DefaultThreadFactory("agent-client-io", - Thread.currentThread().isDaemon()); + private ThreadFactory SHARED_FACTORY; private static final AtomicLong METRIC_INDEX = new AtomicLong(0); private final String managerHost; private final int managerPort; @@ -104,14 +100,12 @@ public class SenderManager { private final String sourcePath; private final boolean proxySend; private volatile boolean shutdown = false; - // metric private AgentMetricItemSet metricItemSet; private Map dimensions; - private TaskPositionManager taskPositionManager; + private PositionManager positionManager; private int ioThreadNum; private boolean enableBusyWait; - private Semaphore semaphore; private String authSecretId; private String authSecretKey; protected int batchFlushInterval; @@ -144,9 +138,7 @@ public SenderManager(JobProfile jobConf, String inlongGroupId, String sourcePath retrySleepTime = jobConf.getLong( CommonConstants.PROXY_RETRY_SLEEP, CommonConstants.DEFAULT_PROXY_RETRY_SLEEP); isFile = jobConf.getBoolean(CommonConstants.PROXY_IS_FILE, CommonConstants.DEFAULT_IS_FILE); - taskPositionManager = TaskPositionManager.getInstance(); - semaphore = new Semaphore(jobConf.getInt(CommonConstants.PROXY_MESSAGE_SEMAPHORE, - CommonConstants.DEFAULT_PROXY_MESSAGE_SEMAPHORE)); + positionManager = PositionManager.getInstance(); ioThreadNum = jobConf.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM, CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM); enableBusyWait = jobConf.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT, @@ -167,13 +159,15 @@ public SenderManager(JobProfile jobConf, String inlongGroupId, String sourcePath resendQueue = new LinkedBlockingQueue<>(); } - public void Start() { + public void Start() throws Exception { + sender = createMessageSender(inlongGroupId); resendExecutorService.execute(flushResendQueue()); } public void Stop() { shutdown = true; resendExecutorService.shutdown(); + sender.close(); } private AgentMetricItem getMetricItem(Map otherDimensions) { @@ -190,26 +184,6 @@ private AgentMetricItem getMetricItem(String groupId, String streamId) { return getMetricItem(dims); } - /** - * Select by group. - * - * @param group inlong group id - * @return default message sender - */ - private DefaultMessageSender selectSender(String group) { - List senderList = SENDER_MAP.get(group); - return senderList.get((SENDER_INDEX.getAndIncrement() & 0x7FFFFFFF) % senderList.size()); - } - - public void acquireSemaphore(int messageNum) { - try { - semaphore.acquire(messageNum); - } catch (Exception e) { - LOGGER.error("acquire messageNum {} fail, current semaphore {}", - messageNum, semaphore.availablePermits()); - } - } - /** * sender * @@ -228,28 +202,15 @@ private DefaultMessageSender createMessageSender(String tagName) throws Exceptio proxyClientConfig.setEnableBusyWait(enableBusyWait); proxyClientConfig.setProtocolType(ProtocolType.TCP); + SHARED_FACTORY = new DefaultThreadFactory("agent-client-" + sourcePath, + Thread.currentThread().isDaemon()); + DefaultMessageSender sender = new DefaultMessageSender(proxyClientConfig, SHARED_FACTORY); sender.setMsgtype(msgType); sender.setCompress(isCompress); return sender; } - /** - * Add new sender for group id if max size is not satisfied. - */ - public void addMessageSender() throws Exception { - List tmpList = new ArrayList<>(); - List senderList = SENDER_MAP.putIfAbsent(inlongGroupId, tmpList); - if (senderList == null) { - senderList = tmpList; - } - if (senderList.size() > maxSenderPerGroup) { - return; - } - DefaultMessageSender sender = createMessageSender(inlongGroupId); - senderList.add(sender); - } - public void sendBatch(BatchProxyMessage batchMessage) { sendBatchWithRetryCount(batchMessage, 0); } @@ -261,7 +222,7 @@ private void sendBatchWithRetryCount(BatchProxyMessage batchMessage, int retry) boolean suc = false; while (!suc) { try { - selectSender(batchMessage.getGroupId()).asyncSendMessage(new AgentSenderCallback(batchMessage, retry), + sender.asyncSendMessage(new AgentSenderCallback(batchMessage, retry), batchMessage.getDataList(), batchMessage.getGroupId(), batchMessage.getStreamId(), batchMessage.getDataTime(), SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS, batchMessage.getExtraMap(), proxySend); @@ -271,13 +232,14 @@ private void sendBatchWithRetryCount(BatchProxyMessage batchMessage, int retry) } catch (Exception exception) { suc = false; if (retry > maxSenderRetry) { - LOGGER.warn("max retry reached, retry count is {}, sleep and send again", retry); + if (retry % 10 == 0) { + LOGGER.error("max retry reached, sample log Exception caught", exception); + } } else { LOGGER.error("Exception caught", exception); } retry++; AgentUtils.silenceSleepInMs(retrySleepTime); - } } } @@ -289,7 +251,7 @@ private void sendBatchWithRetryCount(BatchProxyMessage batchMessage, int retry) */ private Runnable flushResendQueue() { return () -> { - LOGGER.info("start flush cache thread for {} ProxySink", inlongGroupId); + LOGGER.info("start flush resend queue {}:{}", inlongGroupId, sourcePath); while (!shutdown) { try { AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS); @@ -304,6 +266,7 @@ private Runnable flushResendQueue() { AgentUtils.silenceSleepInMs(batchFlushInterval); } } + LOGGER.info("stop flush resend queue {}:{}", inlongGroupId, sourcePath); }; } @@ -342,10 +305,12 @@ public void onMessageAck(SendResult result) { String jobId = batchMessage.getJobId(); long dataTime = batchMessage.getDataTime(); if (result != null && result.equals(SendResult.OK)) { - semaphore.release(msgCnt); + MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int) batchMessage.getTotalSize()); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, msgCnt, batchMessage.getTotalSize()); getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt); + PositionManager.getInstance() + .updateSinkPosition(batchMessage.getJobId(), sourcePath, msgCnt, false); } else { LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime {} fail with times {}, " + "error {}", groupId, streamId, jobId, dataTime, retry, result); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java index f288ae7b3a0..d680d49f0bf 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java @@ -70,7 +70,6 @@ public List split(JobProfile jobConf) { FileReaderOperator fileReader = new FileReaderOperator(file, startPosition); long waitTimeout = jobConf.getLong(JOB_READ_WAIT_TIMEOUT, DEFAULT_JOB_READ_WAIT_TIMEOUT); fileReader.setWaitMillisecond(waitTimeout); - addValidator(filterPattern, fileReader); result.add(fileReader); } // increment the count of successful sources @@ -83,8 +82,4 @@ private int getStartPosition(JobProfile jobConf, File file) { seekPosition = jobConf.getInt(file.getAbsolutePath() + POSITION_SUFFIX, 0); return seekPosition; } - - private void addValidator(String filterPattern, FileReaderOperator fileReader) { - fileReader.addPatternValidator(filterPattern); - } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java index 82c3a405922..bcd4b592729 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java @@ -22,15 +22,14 @@ import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.constant.DataCollectType; import org.apache.inlong.agent.constant.JobConstants; -import org.apache.inlong.agent.core.task.TaskPositionManager; +import org.apache.inlong.agent.core.task.MemoryManager; +import org.apache.inlong.agent.core.task.PositionManager; import org.apache.inlong.agent.except.FileException; import org.apache.inlong.agent.message.DefaultMessage; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Message; -import org.apache.inlong.agent.plugin.Validator; import org.apache.inlong.agent.plugin.sources.reader.AbstractReader; import org.apache.inlong.agent.plugin.utils.FileDataUtils; -import org.apache.inlong.agent.plugin.validator.PatternValidator; import org.apache.inlong.agent.utils.AgentUtils; import com.google.gson.Gson; @@ -54,7 +53,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -64,6 +62,9 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT; import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MAX_WAIT; import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_META_ENV_LIST; @@ -85,11 +86,12 @@ public class FileReaderOperator extends AbstractReader { private static final Logger LOGGER = LoggerFactory.getLogger(FileReaderOperator.class); public static final int NEVER_STOP_SIGN = -1; - public static final int BATCH_READ_SIZE = 10000; - public static final int CACHE_QUEUE_SIZE = 10 * BATCH_READ_SIZE; - public static final int DEFAULT_BUFFER_SIZE = 64 * 1024; - private static final SimpleDateFormat RECORD_TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - private static final Gson GSON = new Gson(); + public static final int BATCH_READ_LINE_COUNT = 10000; + public static final int BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024; + public static final int CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT; + public static int DEFAULT_BUFFER_SIZE = 64 * 1024; + private final SimpleDateFormat RECORD_TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private final Gson GSON = new Gson(); public File file; public long position = 0; @@ -104,11 +106,11 @@ public class FileReaderOperator extends AbstractReader { public String fileKey = null; private long timeout; private long waitTimeout; + public volatile long monitorUpdateTime; private long lastTime = 0; - private List validators = new ArrayList<>(); - private static final byte[] inBuf = new byte[DEFAULT_BUFFER_SIZE]; - private static int maxPackSize; - + private final byte[] inBuf = new byte[DEFAULT_BUFFER_SIZE]; + private int maxPackSize; + private final long monitorActiveInterval = 60 * 1000; private final BlockingQueue queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); private final StringBuffer sb = new StringBuffer(); @@ -124,31 +126,67 @@ public FileReaderOperator(File file, int position, String md5) { this.metadata = new HashMap<>(); } - public FileReaderOperator(File file) { - this(file, 0); - } - @Override public Message read() { String data = null; try { data = queue.poll(DEFAULT_JOB_READ_WAIT_TIMEOUT, TimeUnit.SECONDS); } catch (InterruptedException e) { - LOGGER.warn("poll {} data get interruptted.", file.getPath(), e); + LOGGER.warn("poll {} data get interrupted.", file.getPath(), e); + } + if (data == null) { + keepMonitorActive(); + return null; + } else { + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, data.length()); + } + Message finalMsg = createMessage(data); + if (finalMsg == null) { + return null; } - return Optional.ofNullable(data) - .map(this::metadataMessage) - .filter(this::filterMessage) - .map(message -> { - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, - System.currentTimeMillis(), 1, message.length()); - readerMetric.pluginReadSuccessCount.incrementAndGet(); - readerMetric.pluginReadCount.incrementAndGet(); - String proxyPartitionKey = jobConf.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); - Map header = new HashMap<>(); - header.put(PROXY_KEY_DATA, proxyPartitionKey); - return new DefaultMessage(message.getBytes(StandardCharsets.UTF_8), header); - }).orElse(null); + boolean channelPermit = MemoryManager.getInstance() + .tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, finalMsg.getBody().length); + if (channelPermit == false) { + LOGGER.warn("channel tryAcquire failed"); + MemoryManager.getInstance().printDetail(AGENT_GLOBAL_CHANNEL_PERMIT); + AgentUtils.silenceSleepInSeconds(1); + return null; + } + return finalMsg; + } + + private Message createMessage(String data) { + String msgWithMetaData = fillMetaData(data); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, + System.currentTimeMillis(), 1, msgWithMetaData.length()); + readerMetric.pluginReadSuccessCount.incrementAndGet(); + readerMetric.pluginReadCount.incrementAndGet(); + String proxyPartitionKey = jobConf.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); + Map header = new HashMap<>(); + header.put(PROXY_KEY_DATA, proxyPartitionKey); + Message finalMsg = new DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header); + // if the message size is greater than max pack size,should drop it. + if (finalMsg.getBody().length > maxPackSize) { + LOGGER.warn("message size is {}, greater than max pack size {}, drop it!", + finalMsg.getBody().length, maxPackSize); + return null; + } + return finalMsg; + } + + public void keepMonitorActive() { + if (!isMonitorActive()) { + LOGGER.error("monitor not active, create a new one"); + MonitorTextFile.getInstance().monitor(this); + } + } + + private boolean isMonitorActive() { + long currentTime = System.currentTimeMillis(); + if (currentTime - monitorUpdateTime > monitorActiveInterval) { + return false; + } + return true; } @Override @@ -207,13 +245,6 @@ public boolean isSourceExist() { return true; } - public void addPatternValidator(String pattern) { - if (pattern.isEmpty()) { - return; - } - validators.add(new PatternValidator(pattern)); - } - @Override public void init(JobProfile jobConf) { try { @@ -228,22 +259,23 @@ public void init(JobProfile jobConf) { LOGGER.warn("md5 is differ from origin, origin: {}, new {}", this.md5, md5); } LOGGER.info("file name for task is {}, md5 is {}", file, md5); - + monitorUpdateTime = System.currentTimeMillis(); MonitorTextFile.getInstance().monitor(this); if (!jobConf.get(JOB_FILE_MONITOR_STATUS, JOB_FILE_MONITOR_DEFAULT_STATUS) .equals(JOB_FILE_MONITOR_DEFAULT_STATUS)) { readEndpoint = Files.lines(file.toPath()).count(); } try { - position = TaskPositionManager.getInstance().getPosition(getReadSource(), instanceId); + position = PositionManager.getInstance().getPosition(getReadSource(), instanceId); } catch (Exception ex) { position = 0; LOGGER.error("get position from position manager error, only occur in ut: {}", ex.getMessage()); } this.bytePosition = getStartBytePosition(position); - LOGGER.info("FileReaderOperator init file {} instanceId {} history position {}", getReadSource(), + LOGGER.info("FileReaderOperator init file {} instanceId {} history position {} readEndpoint {}", + getReadSource(), instanceId, - position); + position, readEndpoint); if (isIncrement(jobConf)) { LOGGER.info("FileReaderOperator DataCollectType INCREMENT: start bytePosition {},{}", file.length(), file.getAbsolutePath()); @@ -251,8 +283,8 @@ public void init(JobProfile jobConf) { try (LineNumberReader lineNumberReader = new LineNumberReader(new FileReader(file.getPath()))) { lineNumberReader.skip(Long.MAX_VALUE); position = lineNumberReader.getLineNumber(); - TaskPositionManager.getInstance().updateSinkPosition( - getJobInstanceId(), getReadSource(), position); + PositionManager.getInstance().updateSinkPosition( + getJobInstanceId(), getReadSource(), position, true); LOGGER.info("for increment update {}, position to {}", file.getAbsolutePath(), position); } catch (IOException ex) { @@ -260,7 +292,7 @@ public void init(JobProfile jobConf) { } } try { - resiterMeta(jobConf); + registerMeta(jobConf); } catch (Exception ex) { LOGGER.error("init metadata error", ex); } @@ -278,8 +310,8 @@ private long getStartBytePosition(long lineNum) throws IOException { input = new RandomAccessFile(file, "r"); while (readCount < lineNum) { List lines = new ArrayList<>(); - pos = readLines(input, pos, lines, - Math.min((int) (lineNum - readCount), FileReaderOperator.BATCH_READ_SIZE)); + pos = readLines(input, pos, lines, Math.min((int) (lineNum - readCount), BATCH_READ_LINE_COUNT), + BATCH_READ_LINE_TOTAL_LEN, true); readCount += lines.size(); if (lines.size() == 0) { LOGGER.error("getStartBytePosition LineNum {} larger than the real file"); @@ -293,7 +325,7 @@ private long getStartBytePosition(long lineNum) throws IOException { input.close(); } } - LOGGER.info("getStartBytePosition LineNum {} position {}", lineNum, pos); + LOGGER.info("getStartBytePosition {} LineNum {} position {}", getReadSource(), lineNum, pos); return pos; } @@ -319,23 +351,26 @@ private void initReadTimeout(JobProfile jobConf) { @Override public void destroy() { + LOGGER.info("destroy read source name {}", getReadSource()); finished = true; + while (!queue.isEmpty()) { + String data = null; + try { + data = queue.poll(DEFAULT_JOB_READ_WAIT_TIMEOUT, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.warn("poll {} data get interrupted.", file.getPath(), e); + } + if (data != null) { + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, data.length()); + } + } queue.clear(); + LOGGER.info("destroy read source name {} end", getReadSource()); LOGGER.info("destroy reader with read {} num {}", metricName, readerMetric == null ? 0 : readerMetric.pluginReadCount.get()); } - public boolean filterMessage(String message) { - if (StringUtils.isBlank(message)) { - return false; - } - if (validators.isEmpty()) { - return true; - } - return validators.stream().allMatch(v -> v.validate(message)); - } - - public String metadataMessage(String message) { + public String fillMetaData(String message) { long timestamp = System.currentTimeMillis(); boolean isJson = FileDataUtils.isJSON(message); Map mergeData = new HashMap<>(metadata); @@ -348,7 +383,7 @@ public boolean hasDataRemaining() { return !queue.isEmpty(); } - public void resiterMeta(JobProfile jobConf) { + public void registerMeta(JobProfile jobConf) { if (!jobConf.hasKey(JOB_FILE_META_ENV_LIST)) { return; } @@ -365,10 +400,16 @@ public void resiterMeta(JobProfile jobConf) { } public void fetchData() throws IOException { - // todo: TaskPositionManager stored position should be changed to byte position.Now it store msg sent, so here - // every line (include empty line) should be sent, otherwise the read position will be offset when - // restarting and recovering. In the same time, Regex end line spiltted line also has this problem, because - // recovering is based on line position. + boolean readFromPosPermit = false; + while (readFromPosPermit == false) { + readFromPosPermit = MemoryManager.getInstance() + .tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); + if (readFromPosPermit == false) { + LOGGER.warn("fetchData tryAcquire failed"); + MemoryManager.getInstance().printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT); + AgentUtils.silenceSleepInSeconds(1); + } + } List lines = readFromPos(bytePosition); if (!lines.isEmpty()) { LOGGER.info("path is {}, line is {}, byte position is {}, reads data lines {}", @@ -376,8 +417,17 @@ public void fetchData() throws IOException { } List resultLines = lines; resultLines.forEach(line -> { + boolean offerPermit = false; + while (offerPermit != true) { + offerPermit = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, line.length()); + if (offerPermit != true) { + LOGGER.warn("offerPermit tryAcquire failed"); + MemoryManager.getInstance().printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT); + AgentUtils.silenceSleepInSeconds(1); + } + } try { - boolean offerSuc = queue.offer(line, 1, TimeUnit.SECONDS); + boolean offerSuc = false; while (offerSuc != true) { offerSuc = queue.offer(line, 1, TimeUnit.SECONDS); } @@ -386,7 +436,9 @@ public void fetchData() throws IOException { LOGGER.error("fetchData offer failed {}", e.getMessage()); } }); + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); if (position >= readEndpoint) { + LOGGER.info("read to the end, set finished position {} readEndpoint {}", position, readEndpoint); finished = true; } } @@ -396,9 +448,8 @@ private List readFromPos(long pos) throws IOException { RandomAccessFile input = null; try { input = new RandomAccessFile(file, "r"); - bytePosition = readLines(input, pos, lines, FileReaderOperator.BATCH_READ_SIZE); + bytePosition = readLines(input, pos, lines, BATCH_READ_LINE_COUNT, BATCH_READ_LINE_TOTAL_LEN, false); position += lines.size(); - TaskPositionManager.getInstance().updateSinkPosition(getJobInstanceId(), getReadSource(), position); } catch (Exception e) { LOGGER.error("readFromPos error {}", e.getMessage()); } finally { @@ -416,7 +467,8 @@ private List readFromPos(long pos) throws IOException { * @return The new position after the lines have been read * @throws java.io.IOException if an I/O error occurs. */ - private static long readLines(RandomAccessFile reader, long pos, List lines, int maxLineCount) + private long readLines(RandomAccessFile reader, long pos, List lines, int maxLineCount, int maxLineTotalLen, + boolean isCounting) throws IOException { if (maxLineCount == 0) { return pos; @@ -425,6 +477,7 @@ private static long readLines(RandomAccessFile reader, long pos, List li reader.seek(pos); long rePos = pos; // position to re-read int num; + int lineTotalLen = 0; LOGGER.debug("readLines from {}", pos); boolean overLen = false; while ((num = reader.read(inBuf)) != -1) { @@ -433,7 +486,13 @@ private static long readLines(RandomAccessFile reader, long pos, List li byte ch = inBuf[i]; switch (ch) { case '\n': - lines.add(new String(baos.toByteArray())); + if (isCounting) { + lines.add(new String("")); + } else { + String temp = new String(baos.toByteArray(), StandardCharsets.UTF_8); + lines.add(temp); + lineTotalLen += temp.length(); + } rePos = pos + i + 1; if (overLen) { LOGGER.warn("readLines over len finally string len {}", @@ -451,11 +510,11 @@ private static long readLines(RandomAccessFile reader, long pos, List li overLen = true; } } - if (lines.size() >= maxLineCount) { + if (lines.size() >= maxLineCount || lineTotalLen >= maxLineTotalLen) { break; } } - if (lines.size() >= maxLineCount) { + if (lines.size() >= maxLineCount || lineTotalLen >= maxLineTotalLen) { break; } if (i == num) { @@ -477,7 +536,7 @@ private boolean isFirstStore(JobProfile jobConf) { isFirst = false; } } - LOGGER.info("is first store job {}, {}", file.getAbsolutePath(), isFirst); + LOGGER.info("isFirst {}, {}", file.getAbsolutePath(), isFirst); return isFirst; } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java index 531b5d08215..33207f5bd2c 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java @@ -18,7 +18,7 @@ package org.apache.inlong.agent.plugin.sources.reader.file; import org.apache.inlong.agent.common.AgentThreadFactory; -import org.apache.inlong.agent.core.task.TaskPositionManager; +import org.apache.inlong.agent.core.task.PositionManager; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -43,7 +43,9 @@ public final class MonitorTextFile { private static final Logger LOGGER = LoggerFactory.getLogger(MonitorTextFile.class); - // monitor thread pool + /** + * monitor thread pool + */ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, @@ -114,13 +116,17 @@ public void run() { long currentTime = System.currentTimeMillis(); if (expireTime != Long.parseLong(JOB_FILE_MONITOR_DEFAULT_EXPIRE) && currentTime - this.startTime > expireTime) { + LOGGER.info("monitor expire in {}", expireTime); break; } if (fileReaderOperator.inited) { listen(); } + fileReaderOperator.monitorUpdateTime = currentTime; TimeUnit.MILLISECONDS.sleep(interval); } + LOGGER.info("Job {} stop monitor {}", + fileReaderOperator.instanceId, fileReaderOperator.file.getAbsolutePath()); } catch (Exception e) { LOGGER.error(String.format("monitor %s error", fileReaderOperator.file.getName()), e); } @@ -164,7 +170,7 @@ private void listen() throws IOException { } /** - * reset the position and bytePosition + * Reset the position and bytePosition */ private void resetPosition() { LOGGER.info("reset position {}", fileReaderOperator.file.toPath()); @@ -173,26 +179,23 @@ private void resetPosition() { String jobInstanceId = fileReaderOperator.getJobInstanceId(); if (jobInstanceId != null) { - TaskPositionManager.getInstance().updateSinkPosition( - jobInstanceId, fileReaderOperator.getReadSource(), 0); + PositionManager.getInstance().updateSinkPosition( + jobInstanceId, fileReaderOperator.getReadSource(), 0, true); } } /** * Determine whether the inode has changed * - * @param currentFileKey - * @return + * @param currentFileKey current file key + * @return true if the inode changed, otherwise false */ private boolean isInodeChanged(String currentFileKey) { if (fileReaderOperator.fileKey == null || currentFileKey == null) { return false; } - if (fileReaderOperator.fileKey.equals(currentFileKey)) { - return false; - } - return true; + return !fileReaderOperator.fileKey.equals(currentFileKey); } } } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java index ce02d1a21c0..113ea654654 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java @@ -23,7 +23,7 @@ import org.apache.inlong.agent.conf.TriggerProfile; import org.apache.inlong.agent.core.AgentManager; import org.apache.inlong.agent.core.HeartbeatManager; -import org.apache.inlong.agent.core.task.TaskPositionManager; +import org.apache.inlong.agent.core.task.PositionManager; import org.powermock.api.mockito.PowerMockito; import org.powermock.api.support.membermodification.MemberModifier; @@ -55,16 +55,16 @@ public MiniAgent() throws Exception { } private void init() throws Exception { - TaskPositionManager taskPositionManager = PowerMockito.mock(TaskPositionManager.class); + PositionManager positionManager = PowerMockito.mock(PositionManager.class); HeartbeatManager heartbeatManager = PowerMockito.mock(HeartbeatManager.class); ProfileFetcher profileFetcher = PowerMockito.mock(ProfileFetcher.class); - PowerMockito.doNothing().when(taskPositionManager, "start"); - PowerMockito.doNothing().when(taskPositionManager, "stop"); + PowerMockito.doNothing().when(positionManager, "start"); + PowerMockito.doNothing().when(positionManager, "stop"); PowerMockito.doNothing().when(heartbeatManager, "start"); PowerMockito.doNothing().when(heartbeatManager, "stop"); PowerMockito.doNothing().when(profileFetcher, "start"); PowerMockito.doNothing().when(profileFetcher, "stop"); - MemberModifier.field(AgentManager.class, "taskPositionManager").set(manager, taskPositionManager); + MemberModifier.field(AgentManager.class, "positionManager").set(manager, positionManager); MemberModifier.field(AgentManager.class, "heartbeatManager").set(manager, heartbeatManager); MemberModifier.field(AgentManager.class, "fetcher").set(manager, profileFetcher); } @@ -105,8 +105,8 @@ public void cleanupJobs() { public void cleanupTriggers() { triggerProfileCache - .forEach(triggerProfile -> manager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId(), - false)); + .forEach(triggerProfile -> manager.getTriggerManager() + .deleteTrigger(triggerProfile.getTriggerId(), false)); triggerProfileCache.clear(); } } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java index 547c132f00e..13df91dc97e 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java @@ -75,7 +75,7 @@ public static void setup() { agent.start(); testRootDir = helper.getTestRootDir(); } catch (Exception e) { - LOGGER.error("setup failure"); + LOGGER.error("setup failure", e); } } From a5770ec4a5cc535fb3d68e5a60c1f292a1961272 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 15 Jun 2023 15:22:46 +0800 Subject: [PATCH 05/10] [INLONG-8251][agent] add global memory limit for file collect --- .../main/java/org/apache/inlong/agent/core/task/TaskManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java index 344cafba2eb..1aa03e7d943 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java @@ -81,6 +81,7 @@ public TaskManager(AgentManager agentManager) { this.taskMetrics = new AgentMetricItemSet(this.getClass().getSimpleName()); this.dimensions = new HashMap<>(); this.dimensions.put(KEY_COMPONENT_NAME, this.getClass().getSimpleName()); + MetricRegister.unregister(taskMetrics); MetricRegister.register(taskMetrics); tasks = new ConcurrentHashMap<>(); From 3802fa23f50d09b5250dec970c40cf4a20e84ccd Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 15 Jun 2023 16:44:11 +0800 Subject: [PATCH 06/10] [INLONG-8251][agent] add global memory limit for file collect add ut for memory manager --- .../agent/core/task/TestMemoryManager.java | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java new file mode 100644 index 00000000000..91182301152 --- /dev/null +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.core.task; + +import org.apache.inlong.agent.conf.AgentConfiguration; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT; + +public class TestMemoryManager { + + private static AgentConfiguration conf; + + @BeforeClass + public static void setup() throws Exception { + conf = AgentConfiguration.getAgentConf(); + } + + @Test + public void testAll() { + int sourcePermit = conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT); + int readerQueuePermit = conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT); + int channelPermit = conf.getInt(AGENT_GLOBAL_CHANNEL_PERMIT, DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT); + int writerPermit = conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, DEFAULT_AGENT_GLOBAL_WRITER_PERMIT); + + boolean suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, sourcePermit); + Assert.assertTrue(suc); + suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, readerQueuePermit); + Assert.assertTrue(suc); + suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, channelPermit); + Assert.assertTrue(suc); + suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, writerPermit); + Assert.assertTrue(suc); + + suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, 1); + Assert.assertFalse(suc); + suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, 1); + Assert.assertFalse(suc); + suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, 1); + Assert.assertFalse(suc); + suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 1); + Assert.assertFalse(suc); + + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, sourcePermit); + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, readerQueuePermit); + MemoryManager.getInstance().release(AGENT_GLOBAL_CHANNEL_PERMIT, channelPermit); + MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, writerPermit); + + suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, sourcePermit); + Assert.assertTrue(suc); + suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, readerQueuePermit); + Assert.assertTrue(suc); + suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, channelPermit); + Assert.assertTrue(suc); + suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, writerPermit); + Assert.assertTrue(suc); + } + +} \ No newline at end of file From b06b17b77ed97277c69c62df26af5b555e5bcc8d Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 15 Jun 2023 17:16:37 +0800 Subject: [PATCH 07/10] [INLONG-8251][agent] add global memory limit for file collect --- .../main/java/org/apache/inlong/agent/core/job/JobManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java index 576d5682b6b..c1e933d8aea 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java @@ -108,6 +108,7 @@ public JobManager(AgentManager agentManager, JobProfileDb jobProfileDb) { this.dimensions = new HashMap<>(); this.dimensions.put(KEY_COMPONENT_NAME, this.getClass().getSimpleName()); this.jobMetrics = new AgentMetricItemSet(this.getClass().getSimpleName()); + MetricRegister.unregister(jobMetrics); MetricRegister.register(jobMetrics); } From 17ae91fe480f45f0a37cc7e7e62d7aa7aabcc61c Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 15 Jun 2023 20:08:02 +0800 Subject: [PATCH 08/10] [INLONG-8251][agent] add global memory limit for file collect --- .../agent/plugin/trigger/TestTriggerManager.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java index 8ea308ec69b..194f4d1e003 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java @@ -92,6 +92,7 @@ public void teardownEach() { @Test public void testRestartTriggerJobRestore() throws Exception { + TriggerProfile triggerProfile1 = TriggerProfile.parseJsonStr(FILE_JOB_TEMPLATE); triggerProfile1.set(JobConstants.JOB_ID, "1"); triggerProfile1.set(JobConstants.JOB_DIR_FILTER_PATTERNS, @@ -100,16 +101,15 @@ public void testRestartTriggerJobRestore() throws Exception { WATCH_FOLDER.newFolder("tmp"); TestUtils.createHugeFiles("1.log", WATCH_FOLDER.getRoot().getAbsolutePath(), "asdqwdqd"); - System.out.println(" task size " + agent.getManager().getTaskManager().getTaskSize()); + System.out.println("testRestartTriggerJobRestore 1 task size " + agent.getManager().getTaskManager().getTaskSize()); await().atMost(10, TimeUnit.SECONDS).until(() -> agent.getManager().getTaskManager().getTaskSize() == 1); - + System.out.println("testRestartTriggerJobRestore 2 task size " + agent.getManager().getTaskManager().getTaskSize()); agent.restart(); + System.out.println("testRestartTriggerJobRestore 3 task size " + agent.getManager().getTaskManager().getTaskSize()); await().atMost(10, TimeUnit.SECONDS).until(() -> agent.getManager().getTaskManager().getTaskSize() == 1); - + System.out.println("testRestartTriggerJobRestore 4 task size " + agent.getManager().getTaskManager().getTaskSize()); // cleanup TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() + "/1.log"); - TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() + "/2.log"); - TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() + "/tmp/3.log"); } @Test From 119a28fd4ac967623b6a885c79d882586a993353 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 15 Jun 2023 20:12:17 +0800 Subject: [PATCH 09/10] [INLONG-8251][agent] add global memory limit for file collect --- .../agent/plugin/trigger/TestTriggerManager.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java index 194f4d1e003..c591708c310 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java @@ -101,13 +101,17 @@ public void testRestartTriggerJobRestore() throws Exception { WATCH_FOLDER.newFolder("tmp"); TestUtils.createHugeFiles("1.log", WATCH_FOLDER.getRoot().getAbsolutePath(), "asdqwdqd"); - System.out.println("testRestartTriggerJobRestore 1 task size " + agent.getManager().getTaskManager().getTaskSize()); + System.out.println( + "testRestartTriggerJobRestore 1 task size " + agent.getManager().getTaskManager().getTaskSize()); await().atMost(10, TimeUnit.SECONDS).until(() -> agent.getManager().getTaskManager().getTaskSize() == 1); - System.out.println("testRestartTriggerJobRestore 2 task size " + agent.getManager().getTaskManager().getTaskSize()); + System.out.println( + "testRestartTriggerJobRestore 2 task size " + agent.getManager().getTaskManager().getTaskSize()); agent.restart(); - System.out.println("testRestartTriggerJobRestore 3 task size " + agent.getManager().getTaskManager().getTaskSize()); + System.out.println( + "testRestartTriggerJobRestore 3 task size " + agent.getManager().getTaskManager().getTaskSize()); await().atMost(10, TimeUnit.SECONDS).until(() -> agent.getManager().getTaskManager().getTaskSize() == 1); - System.out.println("testRestartTriggerJobRestore 4 task size " + agent.getManager().getTaskManager().getTaskSize()); + System.out.println( + "testRestartTriggerJobRestore 4 task size " + agent.getManager().getTaskManager().getTaskSize()); // cleanup TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() + "/1.log"); } From be14d8056609dfea2600cef545ba71a3f706f9e5 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 15 Jun 2023 20:39:00 +0800 Subject: [PATCH 10/10] [INLONG-8251][agent] add global memory limit for file collect --- .../agent/plugin/trigger/TestTriggerManager.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java index c591708c310..1e3a19f0770 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java @@ -101,17 +101,13 @@ public void testRestartTriggerJobRestore() throws Exception { WATCH_FOLDER.newFolder("tmp"); TestUtils.createHugeFiles("1.log", WATCH_FOLDER.getRoot().getAbsolutePath(), "asdqwdqd"); - System.out.println( - "testRestartTriggerJobRestore 1 task size " + agent.getManager().getTaskManager().getTaskSize()); + LOGGER.info("testRestartTriggerJobRestore 1 task size " + agent.getManager().getTaskManager().getTaskSize()); await().atMost(10, TimeUnit.SECONDS).until(() -> agent.getManager().getTaskManager().getTaskSize() == 1); - System.out.println( - "testRestartTriggerJobRestore 2 task size " + agent.getManager().getTaskManager().getTaskSize()); + LOGGER.info("testRestartTriggerJobRestore 2 task size " + agent.getManager().getTaskManager().getTaskSize()); agent.restart(); - System.out.println( - "testRestartTriggerJobRestore 3 task size " + agent.getManager().getTaskManager().getTaskSize()); - await().atMost(10, TimeUnit.SECONDS).until(() -> agent.getManager().getTaskManager().getTaskSize() == 1); - System.out.println( - "testRestartTriggerJobRestore 4 task size " + agent.getManager().getTaskManager().getTaskSize()); + LOGGER.info("testRestartTriggerJobRestore 3 task size " + agent.getManager().getTaskManager().getTaskSize()); + await().atMost(30, TimeUnit.SECONDS).until(() -> agent.getManager().getTaskManager().getTaskSize() == 1); + LOGGER.info("testRestartTriggerJobRestore 4 task size " + agent.getManager().getTaskManager().getTaskSize()); // cleanup TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() + "/1.log"); }