Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ public abstract class Instance extends AbstractStateWrapper {
* get instance id
*/
public abstract String getInstanceId();

public abstract long getLastHeartbeatTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class InstanceManager extends AbstractDaemon {
private static final int ACTION_QUEUE_CAPACITY = 100;
public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
public static final int INSTANCE_PRINT_INTERVAL_MS = 10000;
public static final long INSTANCE_KEEP_ALIVE_MS = 5 * 60 * 1000;
private long lastPrintTime = 0;
// instance in db
private final InstanceDb instanceDb;
Expand Down Expand Up @@ -240,6 +241,11 @@ private void traverseMemoryTasksToDb() {
if (stateFromDb != InstanceStateEnum.DEFAULT) {
deleteFromMemory(instance.getInstanceId());
}
if (AgentUtils.getCurrentTime() - instance.getLastHeartbeatTime() > INSTANCE_KEEP_ALIVE_MS) {
LOGGER.error("instance heartbeat timeout, id: {}, will be deleted from memory",
instance.getInstanceId());
deleteFromMemory(instance.getInstanceId());
}
});
}

Expand Down Expand Up @@ -391,6 +397,11 @@ private void addToMemory(InstanceProfile instanceProfile) {
}
LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr());
try {
if (instanceMap.size() > instanceLimit) {
LOGGER.info("add instance to memory refused because instanceMap size over limit {}",
instanceProfile.getInstanceId());
return;
}
Class<?> taskClass = Class.forName(instanceProfile.getInstanceClass());
Instance instance = (Instance) taskClass.newInstance();
boolean initSuc = instance.init(this, instanceProfile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class TaskManager extends AbstractDaemon {
public static final int CONFIG_QUEUE_CAPACITY = 1;
public static final int CORE_THREAD_SLEEP_TIME = 1000;
public static final int CORE_THREAD_PRINT_TIME = 10000;
private static final int ACTION_QUEUE_CAPACITY = 100000;
private static final int ACTION_QUEUE_CAPACITY = 1000;
private long lastPrintTime = 0;
// task basic db
private final Db taskBasicDb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.utils.AgentUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -66,6 +67,11 @@ public String getInstanceId() {
return profile.getInstanceId();
}

@Override
public long getLastHeartbeatTime() {
return AgentUtils.getCurrentTime();
}

@Override
public void addCallbacks() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class CommonInstance extends Instance {
private volatile boolean running = false;
private volatile boolean inited = false;
private volatile int checkFinishCount = 0;
private int heartbeatcheckCount = 0;
private int heartbeatCheckCount = 0;
private long heartBeatStartTime = AgentUtils.getCurrentTime();
protected long auditVersion;

Expand All @@ -72,15 +72,15 @@ public boolean init(Object srcManager, InstanceProfile srcProfile) {
profile.getInstanceId(), profile.toJsonStr());
source = (Source) Class.forName(profile.getSourceClass()).newInstance();
source.init(profile);
source.start();
sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
sink.init(profile);
inited = true;
return true;
} catch (Throwable e) {
handleSourceDeleted();
handleDeleted();
doChangeState(State.FATAL);
LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), e);
LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(),
e);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
return false;
}
Expand Down Expand Up @@ -117,10 +117,17 @@ public void run() {
}

private void doRun() {
source.start();
while (!isFinished()) {
if (!source.sourceExist()) {
handleSourceDeleted();
break;
if (handleDeleted()) {
break;
} else {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
continue;
}
}
Message msg = source.read();
if (msg == null) {
Expand All @@ -144,8 +151,8 @@ private void doRun() {
AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
}
}
heartbeatcheckCount++;
if (heartbeatcheckCount > HEARTBEAT_CHECK_GAP) {
heartbeatCheckCount++;
if (heartbeatCheckCount > HEARTBEAT_CHECK_GAP) {
heartbeatStatic();
}
}
Expand All @@ -156,7 +163,7 @@ private void heartbeatStatic() {
if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, profile.getInlongGroupId(),
profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1, auditVersion);
heartbeatcheckCount = 0;
heartbeatCheckCount = 0;
heartBeatStartTime = AgentUtils.getCurrentTime();
}
}
Expand All @@ -169,16 +176,17 @@ private void handleReadEnd() {
}
}

private void handleSourceDeleted() {
private boolean handleDeleted() {
OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId());
profile.setState(InstanceStateEnum.DELETE);
profile.setModifyTime(AgentUtils.getCurrentTime());
InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
while (!isFinished() && !instanceManager.submitAction(action)) {
LOGGER.error("instance manager action queue is full: taskId {}",
instanceManager.getTaskId());
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
return instanceManager.submitAction(action);
}

@Override
public long getLastHeartbeatTime() {
return heartBeatStartTime;
}

@Override
Expand Down