Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] why "runWorkerTaskH" executes in same thread for multiple jobs/connections? #2739

Open
madkote opened this issue Aug 9, 2023 · 2 comments

Comments

@madkote
Copy link

madkote commented Aug 9, 2023

hi all,

question about runWorkerTaskH, since I have a CPU intensive task and need to execute it in thread.

In the example bellow, for each WS connection a new worker task is started. Since the task will perform heavy computations, I expect that each connection will have its own new thread.

But logs and task manager show me a strange behavior, where computations for different WS are done on same thread???

Maybe I do something wrong?

/*
 * Case 01
 * -------
 * Worker task is executed always in same worker thread.
 *
 * How-To:
 * - open 4 parallel WS connection to service.
 * - in each connection send channel id, eg. "ch01", "ch02", ...
 * - observe logs
 * - worker thread name is always same
 */

import vibe.d;
import vibe.vibe;
import vibe.core.core;
import vibe.http.server;
import vibe.http.websockets;
import vibe.http.router;
import vibe.inet.url;

import core.time;
import core.thread : Thread;
import std.conv;

static void workerFuncPingPong(Task caller, string channel_id) nothrow {
	int counter = 5;
	try {
		logInfo("WORKER :: thread-id=%s caller=%s channel-id=%s THREAD=%s", thisTid, caller, channel_id, Thread.getThis().name);
		while (receiveOnly!string() == "ping" && --counter) {
			logInfo("%s :: %s :: pong=%s", Thread.getThis().name, channel_id, counter);
			caller.send("pong");
			sleep(2.seconds);
		}
		caller.send("goodbye");
	} catch (Exception e) assert(false, e.msg);
}

class WebsocketService {
	@path("/ws") void getWebsocket1(scope WebSocket ws){
		logInfo("X> connected=%s, ws=%s code=%s THREAD=%s", ws.connected, &ws, ws.closeCode, Thread.getThis().name);
		
		auto channel_id = ws.receiveText;
		logInfo("Receive channel '%s'.", channel_id);

		auto callee = runWorkerTaskH(&workerFuncPingPong, Task.getThis, channel_id);
		do {
			logInfo("ping");
			callee.send("ping");
		} while (receiveOnly!string() == "pong");
		logInfo("Client disconnected - worker is done. THREAD=%s", Thread.getThis().name);
	}
}

void helloWorld(HTTPServerRequest req, HTTPServerResponse res)
{	
    res.writeBody("Hello");
}

void main()
{
	logInfo("APP::CASE::01");
	auto router = new URLRouter;
	router.registerWebInterface(new WebsocketService());
	router.get("/hello", &helloWorld);

	auto settings = new HTTPServerSettings;
	settings.port = 8080;
	settings.bindAddresses = ["::1", "127.0.0.1"];

	auto listener = listenHTTP(settings, router);
	scope (exit)
	{
		listener.stopListening();
	}

	runApplication();
}

and here is console output

[main(----) INF] APP::CASE::01
[main(----) INF] Listening for requests on http://[::1]:8080/
[main(----) INF] Listening for requests on http://127.0.0.1:8080/
[main(8ZfR) INF] X> connected=true, ws=7F67F6D66DD8 code=0 THREAD=main
[main(W/zo) INF] X> connected=true, ws=7F67F5D65DD8 code=0 THREAD=main
[main(9a8B) INF] X> connected=true, ws=7F67F7D67DD8 code=0 THREAD=main
[main(KFgz) INF] X> connected=true, ws=7F67F8D68DD8 code=0 THREAD=main

[main(8ZfR) INF] Receive channel '3-fibonacci'.
[main(W/zo) INF] Receive channel '1-fibonacci'.
[main(9a8B) INF] Receive channel '2-fibonacci'.
[main(KFgz) INF] Receive channel '0-fibonacci'.

[vibe-4(wpkK) INF] WORKER :: thread-id=Tid(7f67fb24c8f0) caller=7F67FB239A00:1 channel-id=3-fibonacci THREAD=vibe-4
[vibe-14(Prm2) INF] WORKER :: thread-id=Tid(7f67fb24cd10) caller=7F67FB239C00:1 channel-id=1-fibonacci THREAD=vibe-14
[vibe-14(KRJj) INF] WORKER :: thread-id=Tid(7f67fb257000) caller=7F67FB239800:1 channel-id=2-fibonacci THREAD=vibe-14
[vibe-14(NkC2) INF] WORKER :: thread-id=Tid(7f67fb2570b0) caller=7F67FB239600:5 channel-id=0-fibonacci THREAD=vibe-14
[main(W/zo) INF] ping
[main(8ZfR) INF] ping
[main(9a8B) INF] ping
[main(KFgz) INF] ping
[vibe-14(Prm2) INF] vibe-14 :: 1-fibonacci :: pong=4
[vibe-4(wpkK) INF] vibe-4 :: 3-fibonacci :: pong=4
[vibe-14(KRJj) INF] vibe-14 :: 2-fibonacci :: pong=4
[vibe-14(NkC2) INF] vibe-14 :: 0-fibonacci :: pong=4
...

Above I would expect 4 different thread names since I have 4 ws connections running, but instead I have 4 connections and only 2 threads!

Thanks in advance for help and hints!

@madkote
Copy link
Author

madkote commented Aug 9, 2023

an add - sometimes it is just ONE thread for all connections

[vibe-10(nijw) INF] WORKER :: thread-id=Tid(7fdcc4e82c60) caller=7FDCC4E6D400:2 channel-id=2-fibonacci THREAD=vibe-10
[vibe-10(oxzy) INF] WORKER :: thread-id=Tid(7fdcc4e82dc0) caller=7FDCC4E5F800:2 channel-id=3-fibonacci THREAD=vibe-10
[vibe-10(orNI) INF] WORKER :: thread-id=Tid(7fdcc4e82e70) caller=7FDCC4E6D200:2 channel-id=1-fibonacci THREAD=vibe-10
[vibe-10(ofz5) INF] WORKER :: thread-id=Tid(7fdcc4e82f20) caller=7FDCC4E5FA00:29 channel-id=0-fibonacci THREAD=vibe-10

@s-ludwig
Copy link
Member

Worker tasks are processed by a fixed set of threads on a first-come-first-serve basis. By default, the number of threads matches the number of logical CPU cores of the system, but this can be customized at startup using vibe.core.core.setupWorkerThreads.

This is generally the most efficient way to handle such workloads, since it avoids the thread creation and resource overhead for a one-task-to-one-thread mapping. But especially if the workloads have a very different duration, this can indeed have the drawback of a non-optimal distribution across threads - and you also need to call vibe.core.core.yield periodically to ensure quasi-parallel processing of tasks within the same thread.

But if you really need one thread per task, instead of runWorkerTask, you can simply create a new Thread and perform the work directly, without any task involved. vibe.core.channel.Channel!T together with a free-list could be used to reuse threads and avoid the creation overhead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants