Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to multithread postprocess framesets in C++? #6865

Closed
Darustc4 opened this issue Jul 20, 2020 · 16 comments
Closed

How to multithread postprocess framesets in C++? #6865

Darustc4 opened this issue Jul 20, 2020 · 16 comments

Comments

@Darustc4
Copy link

Required Info
Camera Model D435
Firmware Version (Open RealSense Viewer --> Click info)
Operating System & Version Linux (Raspbian buster)
Kernel Version (Linux Only) 10
Platform Raspberry Pi 4b 4GB
SDK Version 2.0
Language C++
Segment other

Hello, I am working on an application that requires obtaining a frameset from the camera, applying filters to it, exporting it to PLY and sending it to a server after compressing it to Draco format.

The speed requirements of the system are too high for single threaded processing (getting around 8 fps at best), so I've tried creating a single producer-multiple consumer multithreaded model where frames are stored until a consumer thread grabs it for postprocessing. (The order in which the frames are sent out is irrelevant).

Even thought I'm calling .keep() on every single processing step onto the framesets, depth frames, video frames and point sets, internal errors and segfaults keep on happening randomly (No error is registered in the log files), so I guess this is not the way to go.

Is there any multithreaded processing example code I can follow? I've been looking for older issues on this matter and found no relevant posts.

I can post a code snippet if requested.

Looking forwards to any help or tips.

@MartyG-RealSense
Copy link
Collaborator

MartyG-RealSense commented Jul 20, 2020

Hi @Darustc4 Can you help me to visualise how you are using the keep() instruction, please?

The traditional use of keep() is to store frames in memory as they are generated instead of recording them continuously to file. When the pipeline is closed, actions such as post-processing and alignment can then be applied to the entire set of frames and the updated frames saved to a file such as a ply.

Because keep() consumes memory capacity quite quickly, it is suited to recording short sequences. A desktop computer with a large amount of memory may be able to store a recorded session of up to 30 seconds. For a device with a relatively low memory capacity (such as your Pi 4's 4 GB) I would expect 10 seconds to be more practical.

Could you explain more please about what you mean when you say that "I'm calling .keep() on every single processing step" please? Thanks!

The link below directs to a useful discussion about the mechanics of keep().

#1000 (comment)

@Darustc4
Copy link
Author

Hello Marty, thanks for the quick response, I will post a trimmed version with additional comments of my code, you can see how I use the keep() function in the PointCloudProcessor class, and in FrameTimestamped:

// Threading safe Queue, used to store framesets
template <typename T>
class Buffer{
public:
	void add(T num) {
		while (true) {
			std::unique_lock<std::mutex> locker(mu);
			cond.wait(locker, [this](){return buffer_.size() < size_;});
			buffer_.push_back(num);
			locker.unlock();
			cond.notify_all();
			return;
		}
	}
	T remove() {
		while (true){
			std::unique_lock<std::mutex> locker(mu);
			cond.wait(locker, [this](){return buffer_.size() > 0;});
			T back = buffer_.back();
			buffer_.pop_back();
			locker.unlock();
			cond.notify_all();
			return back;
		}
	}
	Buffer(const unsigned int bufferSize): size_(bufferSize){}
private:
	std::mutex mu;
	std::condition_variable cond;

	std::deque<T> buffer_;
	const unsigned int size_;
};


// Nodes that will be stored in the Threading safe queue, contains the frameset + a timestamp used for transmission to server
// Note how the constructor also calls keep() onto the stored frame
struct FrameTimestamped{
	FrameTimestamped(rs2::frameset frame, unsigned long long time): frame(frame), time(time){ frame.keep(); }
	rs2::frameset frame;
	unsigned long long time;
};


// Class that contains the consumer threads
class PointCloudProcessor{
public:
	PointCloudProcessor(unsigned threads, const std::vector<rs2::filter> &filts, const std::string &tmpFold, Transmitter &transmitter, Compressor &compressor):
	maxThreads(threads), filters(filts), tmpFolder(tmpFold), frameBuffer(threads*2), transmitter(transmitter), compressor(compressor){
		
		// Launch all consumer threads, they will all lock since there is no data in the queue yet
		for(int i = 0; i < threads; ++i) threadVector.push_back(std::thread( [=]{this->pointcloudConsumer(i);} ));
	}
	
	// Called by the producer thread to add a new frameset for processing
	void processNewFrameSet(rs2::frameset& frameSet){
		frameSet.keep();
		frameBuffer.add({frameSet, std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count()});
	}

private:
	//Attributes

	Transmitter &transmitter;
	Compressor &compressor;
	
	unsigned int maxThreads;
	std::string tmpFolder;

	Buffer<FrameTimestamped> frameBuffer;
	std::vector<std::thread> threadVector;

	std::vector<rs2::filter> filters;
	

	// Functions
	
	// Every consumer thread will be executing this method forever
	void pointcloudConsumer(int threadId){

		while(true){
			
			try{
				auto timeFrame = frameBuffer.remove(); // Get a frame from the Queue
				auto frame = timeFrame.frame;
				frame.keep();						   
				auto depth = frame.get_depth_frame();
				depth.keep();
				auto video = frame.get_color_frame();
				video.keep();
				if (depth && video){

					for (auto&& filter : filters){
						depth = filter.process(depth);
						// This keep() solves nothing, errors keep popping when applying filters ocassionally, thus the try catch around the whole block
						depth.keep();	 
					}


					// Tell pointcloud object to map to this color frame
					rs2::pointcloud pc;
					pc.map_to(video);

					// Generate the pointcloud and texture mappings
					rs2::points points = pc.calculate(depth);
					points.keep();
					
					std::string plyName = tmpFolder + std::to_string(timeFrame.time) + "_" + std::to_string(threadId) + ".ply";

					points.export_to_ply(plyName, video);
					
					std::string dracoFileName = compressor.compressFile(plyName);
					
					transmitter.sendDataToServer(dracoFileName);
					
					std::remove(plyName.c_str());
					std::remove(dracoFileName.c_str());
				}
			}catch(const std::runtime_error &e){
				// When executing this postprocessing code, runtime_errors launched by the librealsense code will pop up while applying the filters to the depth frame
				// I assume this is caused because the frame was not kept correctely, thus has been deleted mid processing by the realsense code
			}
		}
	}
};


int main(int argc, char * argv[]){
	try{
		std::cout << "Starting pipe...\n";

		////////////// Read the JSON files
		
		// Omitted reading and variable declarations

		rs2::config cfg;
		cfg.enable_stream(RS2_STREAM_COLOR, -1, xRes, yRes, rs2_format::RS2_FORMAT_RGB8);
		cfg.enable_stream(RS2_STREAM_DEPTH, -1, xRes, yRes, rs2_format::RS2_FORMAT_Z16);

		// Declare filters. https://dev.intelrealsense.com/docs/rs-post-processing
		rs2::threshold_filter thresh_filter(0.01f, maxDepth);                    // Threshold  - Cuts off background
		rs2::decimation_filter dec_filter(decimationLv);                         // Decimation - reduces depth frame density
		rs2::spatial_filter spat_filter(spatSmAl, spatSmDe, spatMagnitude, 0);   // Spatial    - edge-preserving spatial smoothing
		rs2::temporal_filter temp_filter(tempSmAl, tempSmDe, 0);                 // Temporal   - reduces temporal noise
		rs2::hole_filling_filter hole_filter(2);                                 // HoleFill   - fills holes, what did you expect?

		std::vector<rs2::filter> filters;

		// The following order of emplacment will dictate the orders in which filters are applied
		filters.push_back(thresh_filter);
		filters.push_back(dec_filter);
		if(enableSpatialFilter) filters.push_back(spat_filter);
		if(enableTemporalFilter) filters.push_back(temp_filter);
		if(enableHoleFilling) filters.push_back(hole_filter);

		// Declare RealSense pipeline, encapsulating the actual device and sensors
		rs2::pipeline pipe;

		// Start streaming with default recommended configuration
		auto profile = pipe.start(cfg);

		// The depth and color cameras occupy two different places within the realsense
		// They need to be aligned so that the data matches

		//Pipeline could choose a device that does not have a color stream
		//If there is no color stream, choose to align depth to another stream
		rs2_stream align_to = find_stream_to_align(profile.get_streams());

		rs2::align align(align_to);

		std::cout << "Pipe started\nLoading configuration...\n";


		/////////////////////// Transmission Config ///////////////////////

		// Omitted more JSON file reading and declarations

		Transmitter transmitter(username, password, host);

		//////////////////// Multithreading config //////////////////////////
		
		Compressor compressor(dracoCompression, dracoQuant);
		
		int nThreads = std::thread::hardware_concurrency()*2;
		if(nThreads == 0) nThreads = 8;
		 
		PointCloudProcessor pointCloudProcessor(nThreads, filters, plyTmpDir, transmitter, compressor); 

		std::cout << "Configuration Loaded\nStarting sender with " << nThreads << " threads.\n\n";


		while (true){
			//////////////////// Obtain colored mesh from Realsense ////////////////////////
			
			// Wait for the next set of frames from the camera
			rs2::frameset frames = pipe.wait_for_frames();

			if (profile_changed(pipe.get_active_profile().get_streams(), profile.get_streams())){
				//If the profile was changed, update the align object, and also get the new device's depth scale
				//This basically never happens, but you can never be too careful
				
				profile = pipe.get_active_profile();
				align_to = find_stream_to_align(profile.get_streams());
				align = rs2::align(align_to);
			}


			// Align all frames to depth viewport
			frames = align.process(frames);

			pointCloudProcessor.processNewFrameSet(frames); // Add the frame to the Queue
			
		}

		return EXIT_SUCCESS;

	}catch (const rs2::error & e){
		std::cerr << "RealSense error calling " << e.get_failed_function() << "(" << e.get_failed_args() << "):\n    " << e.what() << std::endl;
		return EXIT_FAILURE;

	}catch (const std::exception & e){
		std::cerr << e.what() << std::endl;
		return EXIT_FAILURE;
	}
}

rs2_stream find_stream_to_align(const std::vector<rs2::stream_profile>& streams){
	//Given a vector of streams, we try to find a depth stream and another stream to align depth with.
	//We prioritize color streams to make the view look better.
	//If color is not available, we take another stream that (other than depth)
	rs2_stream align_to = RS2_STREAM_ANY;
	bool depth_stream_found = false;
	bool color_stream_found = false;

	for (rs2::stream_profile sp : streams){
		rs2_stream profile_stream = sp.stream_type();
		if (profile_stream != RS2_STREAM_DEPTH){
			if (!color_stream_found) //Prefer color
			align_to = profile_stream;

			if (profile_stream == RS2_STREAM_COLOR){
				color_stream_found = true;
			}
		}else{
			depth_stream_found = true;
		}
	}

	if (!depth_stream_found)
	throw std::runtime_error("No Depth stream available");

	if (align_to == RS2_STREAM_ANY)
	throw std::runtime_error("No stream found to align with Depth");

	return align_to;
}

bool profile_changed(const std::vector<rs2::stream_profile>& current, const std::vector<rs2::stream_profile>& prev){
	for(auto&& sp : prev){
		//If previous profile is in current (maybe just added another)
		auto itr = std::find_if(std::begin(current), std::end(current), [&sp](const rs2::stream_profile& current_sp) { return sp.unique_id() == current_sp.unique_id(); });
		if (itr == std::end(current)) //If previous stream wasn't found in current
		{
		  return true;
		}
	}
	return false;
}

@MartyG-RealSense
Copy link
Collaborator

MartyG-RealSense commented Jul 20, 2020

I should start by emphasising that my librealsense programming knowledge is not as advanced as other Intel RealSense team members on this forum. So please be patient with me. :)

Going through your script, the first thing that got my attention was that you are using an index of -1 for the depth and color streams.

cfg.enable_stream(RS2_STREAM_COLOR, -1, xRes, yRes, rs2_format::RS2_FORMAT_RGB8);
cfg.enable_stream(RS2_STREAM_DEPTH, -1, xRes, yRes, rs2_format::RS2_FORMAT_Z16);

These stream types usually do not need an index number, as there is only one of them per camera. Index numbers are usually used with infrared streams as there are two of them (left and right). So it should be sufficient to just define the streams like this:

cfg.enable_stream(RS2_STREAM_COLOR, xRes, yRes, RS2_FORMAT_RGB8);
cfg.enable_stream(RS2_STREAM_DEPTH, xRes, yRes, RS2_FORMAT_Z16);

You can also define a specific FPS speed for the streams by adding a number value at the end. For example, to set the streams to 30 FPS:

cfg.enable_stream(RS2_STREAM_COLOR, xRes, yRes, RS2_FORMAT_RGB8, 30);
cfg.enable_stream(RS2_STREAM_DEPTH, xRes, yRes, RS2_FORMAT_Z16, 30);

@Darustc4
Copy link
Author

Darustc4 commented Jul 20, 2020

I've been testing and using the first configuration you suggest, and it results in highly degraded output, the image has holes everywhere and very irregular surfaces.

The resulting mesh using my current cfg (with the -1 index):
good

The resulting mesh using your suggested cfg (without the -1 and without specifying the fps rate):
broken

Furthermore, if i try to specify the fps, using the resolution 424x240 (Same one I've been using), I get the following error:
RealSense error calling rs2_pipeline_start_with_config(pipe:0x92ccd8, config:0x91f630):
Failed to get control format size!

While these errors are quite the mystery, they dont seem to be the cause of the main problem, which is the errors randomly happening when trying to postprocess (apply the filters) the frameset inside the PointCloudProcessor class, they happen every few processed frames.

@MartyG-RealSense
Copy link
Collaborator

I went over your script carefully, though some concepts in it exceed my current level of coding knowledge. It may be worth reviewing the keep() script in the link below if you have not seen it already and see if it gives some ideas about your own script:

#1942 (comment)

@Darustc4
Copy link
Author

Yes, I've already checked that post, but sadly I don't see what I'm doing wrong.
Anyhow, I greatly appreciate the time you invested in checking my code.

@MartyG-RealSense
Copy link
Collaborator

I will seek advice from Intel about this case. Thanks so much for your patience.

@Darustc4
Copy link
Author

Darustc4 commented Jul 30, 2020

Is there anything I can provide to facilitate testing or finding a solution? I need to get this problem fixed soon, I've tried multiple variations of my code but all of them end up having strange errors in the feed, the most notable one being the one shown with the chair a few posts above.

I've noticed that depending on the configuration I use for the camera the feed is stable or broken, for example:

This configuration is ok:
"xResol": 424,
"yResol": 240,
"maxDepth": 1.3,
"decimationLevel": 2,
"enableSpatialFilter": true,
"spatialMagnitude": 4,
"spatialSmoothAlpha": 0.6,
"enableTemporalFilter": true,
"spatialSmoothDelta": 8.0,
"temporalSmoothAlpha": 0.5,
"temporalSmoothDelta": 20.0,
"enableHoleFilling": false

This configuration results in a very glitchy and broken image:
"xResol": 424,
"yResol": 240,
"maxDepth": 0.8,
"decimationLevel": 2,
"enableSpatialFilter": true,
"spatialMagnitude": 4,
"spatialSmoothAlpha": 0.6,
"enableTemporalFilter": false,
"spatialSmoothDelta": 8.0,
"temporalSmoothAlpha": 0.5,
"temporalSmoothDelta": 20.0,
"enableHoleFilling": false

@RealSenseSupport
Copy link
Collaborator

Hi @Darustc4,

Please clarify what does it mean that
"This configuration is ok:
"xResol": 424,
"yResol": 240,
"maxDepth": 1.3,
"decimationLevel": 2,
"enableSpatialFilter": true,
"spatialMagnitude": 4,
"spatialSmoothAlpha": 0.6,
"enableTemporalFilter": true,
"spatialSmoothDelta": 8.0,
"temporalSmoothAlpha": 0.5,
"temporalSmoothDelta": 20.0,
"enableHoleFilling": false"

You don't see "internal errors and segfaults" you have mentioned in the question description/etc.? Do you get the desired result when you use this configuration?

Thank you!

@Darustc4
Copy link
Author

Sorry for not being clear enough.

The internal errors happen no matter what i do with the camera configuration, but they're somewhat handled by the try catch ive set up around the frame processing block, even though this reduces the fps as some frames go to waste when this happens.

When I say that some configurations are ok I mean that the feed looks normal (No holes and extreme noise), and when a configuration is bad, the feed will be completely broken and segfaults will keep on appearing almost every processed frame (but the frames are still sent out over to the server, somehow).

I've also tried a version in which the main thread will apply filters and export the processed frame to .ply (leaving the job of compressing the .ply file and sending it over to the server to the consuner threads), this has successfully avoided internal errors from appearing when applying the filters, but still some configurations are very broken, as mentioned above.

@RealSenseSupport
Copy link
Collaborator

@Darustc4,

In order to save time and to solve the matter in a most efficient way, please describe in details what the problem is and what the exact scenario that leads to the problem. On one hand you're saying that "some configurations are ok" and on the other hand you're saying that "the internal errors happen no matter what i do with the camera configuration".
In your description of the issue please provide an exact information about each configuration and what the problem with that configuration including an information about error messages you see, logs and any other details that are relevant for the issue.
Along with these details, please provide us with the accurate information about your system configuration:
how many cameras are in active use in the system, FW version and camera model of each camera,
build number of RealSense SDK 2.0, an information about Wrapper version if you use any,HW configuration details (platforms names, how the camera(s) is/are connected, etc), Linux distribution name and version, Linux Kernel version.
It's also would be helpful if you will provide the information about the installation process instructions you have followed (how kernel was patched, backend information, etc.)

Thank you!

@RealSenseSupport
Copy link
Collaborator

Do you still need help with this question?

@Darustc4
Copy link
Author

Darustc4 commented Feb 9, 2021

After a very long pause in the project I've come back to it hoping the SDK was updated and fixed this multithreading problem I was having, but I wasn't so lucky, although part of the problem was solved.

Now every configuration works without the holes I posted above, but the multithreading processing, specially when applying filters and exporting the mesh to .ply in slave threads generate runtime exceptions which makes the program lose frames and eventually lag to the point of being unusable. Also, if I increase the resolution of the feeds the problem happens more often, I presume it´s due to it needing to iterate over more points and having more chances of raising an exception.

In regards to the information @RealSenseSupport requested, I will post it soon including instructions on how to set up a raspberry from zero. But isn't there any working code example for accelerating frame processing? I feel it could be a great addition to the Examples collection. The rs-post-processing example uses a single thread for grabbing frames while the main thread handles the UI but it can´t be scaled into a master-slave model so it isn't of much use (I think).

Thanks for the patience.

@MartyG-RealSense
Copy link
Collaborator

Hi @Darustc4 librealsense supports the acceleration of operations such as alignment and point clouds by "offloading" processing from the CPU to the GPU. This can be done with librealsense's CUDA support for devices with an Nvidia GPU (which would exclude the Pi 4 if you are still using that board) or with the SDK's GLSL feature (which is 'vendor neutral' and can work with any GPU brand). GLSL may not provide noticable improvement on low-end devices though.

GLSL support can be added to librealsense applications relatively simply by adding gl:: to rs2 instructions. So for example, rs2::pointcloud can become rs2::gl::pointcloud to add GLSL support for point cloud processing.

The rs-gl SDK example program demonstrates the generation of a point cloud with GLSL:

https://github.com/IntelRealSense/librealsense/tree/master/examples/gl

The link below also has a very good 'pros and cons' analysis of GLSL:

#7316

@Darustc4
Copy link
Author

Very interesting, but right now what takes most of the processing time is converting to .ply and compressing to draco, and I don't think that's part of the graphics acceleration scope of the SDK.

I've remodeled my whole threading system to use a segmented pipeline:
[1 Frame grabber that applies filters] > BUFFER > [N .PLY converterers] > BUFFER > [N DRACO compressors] > BUFFER > [1 Transmissor]

This now works well without errors, if anyone is interested in the code I can post it. But I still do not understand why my previous model wasn't working, I think the realsense team should check how the keep() function operates since it doesn't seem to work correctly.

@MartyG-RealSense
Copy link
Collaborator

MartyG-RealSense commented Feb 10, 2021

Thanks very much @Darustc4 for sharing details of your method with the RealSense community!

Writing to ply is known to be a slow method of saving data to file (with bag files being more efficient to write, though much larger in size), so slowness of ply generation may not necessarily be caused by a problem in your program.

#2149 (comment)

In regard to Keep(), it is designed to store frames in the computer's memory instead of continuously writing them to file, and then performing batch-processing on all the stored frames after the pipeline is closed (e.g post-process and / or align and save the frames in a single action after the pipeline is closed). Because Keep() continuously consumes computer memory capacity as time passes during streaming as it stores frames in memory, it is suited for short recording durations of up to 30 seconds.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants