Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Agent] PulsarSink threadPool throw reject exception #7752

Closed
2 tasks done
wangpeix opened this issue Mar 31, 2023 · 0 comments · Fixed by #7753
Closed
2 tasks done

[Bug][Agent] PulsarSink threadPool throw reject exception #7752

wangpeix opened this issue Mar 31, 2023 · 0 comments · Fixed by #7753
Assignees
Labels
component/agent type/bug Something is wrong
Milestone

Comments

@wangpeix
Copy link
Contributor

wangpeix commented Mar 31, 2023

What happened

When remove the file data source on databoard. Then add one file data source in new inlong group on databoard. Agent occur the following exception:

023-03-31 22:06:50.408 -ERROR  [               task-job_11_2-6] o.a.i.a.c.t.TaskWrapper       :211 error while running wrapper
java.util.concurrent.RejectedExecutionException: Task org.apache.inlong.agent.plugin.sinks.PulsarSink$$Lambda$208/701747533@5f91f266 rejected from java.util.concurrent.ThreadPoolExecutor@43d6a181[Shutting down, pool size = 6, active threads = 6, queued tasks = 0, completed tasks = 2]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) ~[?:1.8.0_151]
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) ~[?:1.8.0_151]
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) ~[?:1.8.0_151]
	at org.apache.inlong.agent.plugin.sinks.PulsarSink.init(PulsarSink.java:153) ~[agent-plugins-1.7.0-SNAPSHOT.jar:1.7.0-SNAPSHOT]
	at org.apache.inlong.agent.core.task.Task.init(Task.java:76) ~[agent-core-1.7.0-SNAPSHOT.jar:1.7.0-SNAPSHOT]
	at org.apache.inlong.agent.core.task.TaskWrapper.run(TaskWrapper.java:203) ~[agent-core-1.7.0-SNAPSHOT.jar:1.7.0-SNAPSHOT]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_151]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_151]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_151]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_151]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_151]

What you expected to happen

Agent PulsarSink thread pool is global, like this:

public class PulsarSink extends AbstractSink {

    private static final Logger LOGGER = LoggerFactory.getLogger(LoggerFactory.class);
    private static final AtomicInteger CLIENT_INDEX = new AtomicInteger(0);
    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new AgentThreadFactory("PulsarSink"));

When PulsarSink#destroy, the thread pool will be shut down.

public void destroy() {
  ...
  EXECUTOR_SERVICE.shutdown();
  ...
}

Then PulsarSink#init submit thread to the shutdown thread pool may cause the problem.

public void init(JobProfile jobConf) {
  ...
  EXECUTOR_SERVICE.execute(sendDataThread());
  ...
}

How to reproduce

Remove the file data source on databoard. Then add one file data source in new inlong group on databoard. Agent occur the exception.

Environment

No response

InLong version

master

InLong Component

InLong Agent

Are you willing to submit PR?

  • Yes, I am willing to submit a PR!

Code of Conduct

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/agent type/bug Something is wrong
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants