From c57f59636f6e24c90bc3b03b5c06fd8773261e3e Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Sun, 12 May 2024 14:40:27 +0800 Subject: [PATCH 1/4] [INLONG-10189][Agent] Handling SDK initialization exceptions --- .../apache/inlong/agent/plugin/Instance.java | 2 + .../agent/core/instance/InstanceManager.java | 10 +++++ .../inlong/agent/core/task/TaskManager.java | 2 +- .../agent/core/instance/MockInstance.java | 6 +++ .../agent/plugin/instance/CommonInstance.java | 38 +++++++++++-------- 5 files changed, 42 insertions(+), 16 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java index 990d7e60b26..0d43587f6eb 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java @@ -53,4 +53,6 @@ public abstract class Instance extends AbstractStateWrapper { * get instance id */ public abstract String getInstanceId(); + + public abstract long getLastHeartbeatTime(); } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index d388f2293e8..7af1a1dfaa6 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -54,6 +54,7 @@ public class InstanceManager extends AbstractDaemon { private static final int ACTION_QUEUE_CAPACITY = 100; public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000; public static final int INSTANCE_PRINT_INTERVAL_MS = 10000; + public static final long INSTANCE_MAX_HEARTBEAT_GAP_MS = 5 * 60 * 1000; private long lastPrintTime = 0; // instance in db private final InstanceDb instanceDb; @@ -240,6 +241,10 @@ private void traverseMemoryTasksToDb() { if (stateFromDb != InstanceStateEnum.DEFAULT) { deleteFromMemory(instance.getInstanceId()); } + if (AgentUtils.getCurrentTime() - instance.getLastHeartbeatTime() > INSTANCE_MAX_HEARTBEAT_GAP_MS) { + LOGGER.error("instance heartbeat timeout {} will delete from memory", instance.getInstanceId()); + deleteFromMemory(instance.getInstanceId()); + } }); } @@ -391,6 +396,11 @@ private void addToMemory(InstanceProfile instanceProfile) { } LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr()); try { + if (instanceMap.size() > instanceLimit) { + LOGGER.info("add instance to memory refused because instanceMap size over limit {}", + instanceProfile.getInstanceId()); + return; + } Class taskClass = Class.forName(instanceProfile.getInstanceClass()); Instance instance = (Instance) taskClass.newInstance(); boolean initSuc = instance.init(this, instanceProfile); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java index eca7f6b25f4..7fded1fcbd1 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java @@ -58,7 +58,7 @@ public class TaskManager extends AbstractDaemon { public static final int CONFIG_QUEUE_CAPACITY = 1; public static final int CORE_THREAD_SLEEP_TIME = 1000; public static final int CORE_THREAD_PRINT_TIME = 10000; - private static final int ACTION_QUEUE_CAPACITY = 100000; + private static final int ACTION_QUEUE_CAPACITY = 1000; private long lastPrintTime = 0; // task basic db private final Db taskBasicDb; diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java index dc4e16bebcc..ea5a260b421 100644 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java @@ -19,6 +19,7 @@ import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.plugin.Instance; +import org.apache.inlong.agent.utils.AgentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +67,11 @@ public String getInstanceId() { return profile.getInstanceId(); } + @Override + public long getLastHeartbeatTime() { + return AgentUtils.getCurrentTime(); + } + @Override public void addCallbacks() { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java index e8d848b36b4..566fb7be444 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java @@ -57,7 +57,7 @@ public abstract class CommonInstance extends Instance { private volatile boolean running = false; private volatile boolean inited = false; private volatile int checkFinishCount = 0; - private int heartbeatcheckCount = 0; + private int heartbeatCheckCount = 0; private long heartBeatStartTime = AgentUtils.getCurrentTime(); protected long auditVersion; @@ -72,15 +72,15 @@ public boolean init(Object srcManager, InstanceProfile srcProfile) { profile.getInstanceId(), profile.toJsonStr()); source = (Source) Class.forName(profile.getSourceClass()).newInstance(); source.init(profile); - source.start(); sink = (Sink) Class.forName(profile.getSinkClass()).newInstance(); sink.init(profile); inited = true; return true; } catch (Throwable e) { - handleSourceDeleted(); + handleDeleted(); doChangeState(State.FATAL); - LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), e); + LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), + e); ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); return false; } @@ -117,10 +117,17 @@ public void run() { } private void doRun() { + source.start(); while (!isFinished()) { if (!source.sourceExist()) { - handleSourceDeleted(); - break; + if (handleDeleted()) { + break; + } else { + LOGGER.error("instance manager action queue is full: taskId {}", + instanceManager.getTaskId()); + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + continue; + } } Message msg = source.read(); if (msg == null) { @@ -144,8 +151,8 @@ private void doRun() { AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS); } } - heartbeatcheckCount++; - if (heartbeatcheckCount > HEARTBEAT_CHECK_GAP) { + heartbeatCheckCount++; + if (heartbeatCheckCount > HEARTBEAT_CHECK_GAP) { heartbeatStatic(); } } @@ -156,7 +163,7 @@ private void heartbeatStatic() { if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) { AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, profile.getInlongGroupId(), profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1, auditVersion); - heartbeatcheckCount = 0; + heartbeatCheckCount = 0; heartBeatStartTime = AgentUtils.getCurrentTime(); } } @@ -169,16 +176,17 @@ private void handleReadEnd() { } } - private void handleSourceDeleted() { + private boolean handleDeleted() { OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId()); profile.setState(InstanceStateEnum.DELETE); profile.setModifyTime(AgentUtils.getCurrentTime()); InstanceAction action = new InstanceAction(ActionType.DELETE, profile); - while (!isFinished() && !instanceManager.submitAction(action)) { - LOGGER.error("instance manager action queue is full: taskId {}", - instanceManager.getTaskId()); - AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); - } + return instanceManager.submitAction(action); + } + + @Override + public long getLastHeartbeatTime() { + return heartBeatStartTime; } @Override From 4413488a3898af333f90bc215cb0e30a603ed546 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Mon, 13 May 2024 10:15:55 +0800 Subject: [PATCH 2/4] [INLONG-10189][Agent] Modify based on comments --- .../apache/inlong/agent/core/instance/InstanceManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 7af1a1dfaa6..ea16d997f6d 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -54,7 +54,7 @@ public class InstanceManager extends AbstractDaemon { private static final int ACTION_QUEUE_CAPACITY = 100; public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000; public static final int INSTANCE_PRINT_INTERVAL_MS = 10000; - public static final long INSTANCE_MAX_HEARTBEAT_GAP_MS = 5 * 60 * 1000; + public static final long INSTANCE_KEEP_ALIVE_MS = 5 * 60 * 1000; private long lastPrintTime = 0; // instance in db private final InstanceDb instanceDb; @@ -241,7 +241,7 @@ private void traverseMemoryTasksToDb() { if (stateFromDb != InstanceStateEnum.DEFAULT) { deleteFromMemory(instance.getInstanceId()); } - if (AgentUtils.getCurrentTime() - instance.getLastHeartbeatTime() > INSTANCE_MAX_HEARTBEAT_GAP_MS) { + if (AgentUtils.getCurrentTime() - instance.getLastHeartbeatTime() > INSTANCE_KEEP_ALIVE_MS) { LOGGER.error("instance heartbeat timeout {} will delete from memory", instance.getInstanceId()); deleteFromMemory(instance.getInstanceId()); } From 093b3e807ec3ab636cc5d5a1d18909cb8bdd6438 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Mon, 13 May 2024 10:24:12 +0800 Subject: [PATCH 3/4] Update inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java Co-authored-by: AloysZhang --- .../org/apache/inlong/agent/core/instance/InstanceManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index ea16d997f6d..2916d54d99a 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -242,7 +242,7 @@ private void traverseMemoryTasksToDb() { deleteFromMemory(instance.getInstanceId()); } if (AgentUtils.getCurrentTime() - instance.getLastHeartbeatTime() > INSTANCE_KEEP_ALIVE_MS) { - LOGGER.error("instance heartbeat timeout {} will delete from memory", instance.getInstanceId()); + LOGGER.error("instance heartbeat timeout, id: {}, will be deleted from memory", instance.getInstanceId()); deleteFromMemory(instance.getInstanceId()); } }); From 7b17b09c50c0cfde42fd06e2a48acf68453a9707 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Mon, 13 May 2024 10:26:01 +0800 Subject: [PATCH 4/4] [INLONG-10189][Agent] Modify based on comments --- .../org/apache/inlong/agent/core/instance/InstanceManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 2916d54d99a..25ce136ddf1 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -242,7 +242,8 @@ private void traverseMemoryTasksToDb() { deleteFromMemory(instance.getInstanceId()); } if (AgentUtils.getCurrentTime() - instance.getLastHeartbeatTime() > INSTANCE_KEEP_ALIVE_MS) { - LOGGER.error("instance heartbeat timeout, id: {}, will be deleted from memory", instance.getInstanceId()); + LOGGER.error("instance heartbeat timeout, id: {}, will be deleted from memory", + instance.getInstanceId()); deleteFromMemory(instance.getInstanceId()); } });