From cef02bd42b1ecf05706bbbecaf20bf4aec8ce12e Mon Sep 17 00:00:00 2001 From: fnoop Date: Tue, 21 Aug 2018 22:42:07 +0100 Subject: [PATCH] Refactor threading and latency handling --- src/CMakeLists.txt | 6 +- src/pkQueueTS.hpp | 348 ++++++++++++++++++++++++++++++++++++++++++ src/track_targets.cpp | 224 +++++++++++++++++++-------- vision_landing | 123 ++++++++++----- 4 files changed, 598 insertions(+), 103 deletions(-) create mode 100644 src/pkQueueTS.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9204265..868bc06 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,10 +10,12 @@ ENDIF() add_executable(track_targets track_targets.cpp) -set(cpp_compile_flags "-std=gnu++11") +set(EXTRA_C_FLAGS_RELEASE "${EXTRA_C_FLAGS_RELEASE} -std=c++0x -pthread -march=armv8-a+crc -mfpu=neon-vfpv4 -mtune=cortex-a53 -ftree-vectorize -mfloat-abi=hard -O3 ") +set(cpp_compile_flags "-std=gnu++11 -pthread") add_definitions(${cpp_compile_flags}) include_directories(${aruco_INCLUDE_DIRS} ${OpenCV_INCLUDE_DIRS}) -target_link_libraries(track_targets aruco ${aruco_LIBS} ${OpenCV_LIBS}) +target_link_libraries(track_targets aruco ${aruco_LIBS} ${OpenCV_LIBS} pthread) link_directories(${aruco_LIB_DIR} ${OpenCV_INSTALL_PATH}/lib}) +link_libraries(pthread) install(PROGRAMS track_targets DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/..) diff --git a/src/pkQueueTS.hpp b/src/pkQueueTS.hpp new file mode 100644 index 0000000..471ed9a --- /dev/null +++ b/src/pkQueueTS.hpp @@ -0,0 +1,348 @@ +/************************************************************************************** +* +* IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. +* +* By downloading, copying, installing or using the software you agree to this license. +* If you do not agree to this license, do not download, install, +* copy or use the software. +* +* License Agreement +* For pkQueueTS +* +* pkQueueTS - http://pklab.net/index.php?id=395 +* Copyright (C) 2016 PkLab.net all rights reserved. +* +* Third party copyrights are property of their respective owners. +* +* Redistribution and use in source and binary forms, with or without modification, +* are permitted provided that the following conditions are met: +* +* 1. Redistributions of source code must retain the above copyright notice, +* this list of conditions and the following disclaimer. +* 2. Redistributions in binary form must reproduce the above copyright notice, +* this list of conditions and the following disclaimer in the documentation +* and/or other materials provided with the distribution. +* 3. The name of the author may not be used to endorse or promote products +* derived from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED +* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO +* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; +* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*****************************************************************************************/ + +#ifndef PKQUEUETS_H +#define PKQUEUETS_H + +#if __cplusplus < 199711L +#error "This file requires C++ 11 !" +#endif + +#include +#include +#include +#include +#include + +#include +#include + + +using namespace std; +using namespace cv; + +//! \defgroup pkQueueTS A Thread safe queue by PkLab +//! @{ + +/** \brief Thread Safe Queue result codes + */ +enum pkQueueResults +{ + PK_QTS_OK, //!< The result is correct + PK_QTS_TIMEOUT, //!< A time-out has reached while push or pop + PK_QTS_EMPTY, //!< The queue is empty and we wont wait +}; + +/** \brief Simple structure for an OpenCV Mat with deep copy constructor and assignment operator. +* +* This class encapsulates a `cv::Mat` and provides custom assignment operator and +* copy constructor to perform deep copy of matrix data too. +* +* This is because the `cv::Mat` assignment operator and the copy constructor +* just copy the Mat header but not the matrix data. +* +* This class is useful for Mats buffering. For example: +* - `std::queue::push(cv::Mat)` just creates a copy of `cv::Mat` header in the queue. +* - `std::queue::push(MatCapsule)` creates a deep copy of `cv::Mat` in the queue. +* +* \tparam TAdditionalStorage Here you can attach your own data type to hold any kind of information. +* In case your data type contains simple types, like a structure, you can forget about copying. +* If your data type holds any pointers or complex data type, you have to take care +* its copy constructor and assignment operator. +* +* \see +* - Assignment operator [cv::Mat::operator=](http://docs.opencv.org/master/d3/d63/classcv_1_1Mat.html#aed1f81fe7efaacc2bd95149cdfa34302) +* - Copy constructor [cv::Mat::Mat(const Mat &m)](http://docs.opencv.org/master/d3/d63/classcv_1_1Mat.html#a294eaf8a95d2f9c7be19ff594d06278e) +* +*/ +template +class MatCapsule_ +{ +public: + //! The OpenCV Mat + Mat mat; + + /** \brief Storage for additional information (Template) + * + * You can define you own data type to hold any kind of information. + * - In case your `TAdditionalStorage` holds just simple types you can forget about copying. + * - In case `TAdditionalStorage` holds any pointers or complex data type, you have to take care + * of copy constructor and assignment operator. + */ + TAdditionalStorage vars; + + /** \brief Default constructor + */ + MatCapsule_() { } + + /** \brief Class destructor. Called by queue::pop + */ + ~MatCapsule_() { } + + /** \brief The copy constructor. Called by queue::push + * + * Performs a deep copy of the `mat` member and its data. + * Calls the `TAdditionalStorage` copy constructor if any. + */ + MatCapsule_(const MatCapsule_& src) + { + vars = TAdditionalStorage(src.vars); //Calls the copy constructor for the data type + src.mat.copyTo(mat); + } + + /** \brief The assignment operator. Called BY queue::front + * \warning In case you have any member pointers you need to clone them. + * This is because, next calls to queue::pop calls the class destructor destroying all data. + * With `cv::Mat` the matrix data is still valid thanks to its internal memory counter. + */ + MatCapsule_& operator=(const MatCapsule_& src) + { + //Sanity check + if (this == &src) return *this; + + vars = src.vars; // Calls assignment operator for the data type + mat = src.mat; // Just copy the Mat header + + return *this; + } +}; + +/** \brief For your convenience, If you don't need any additional storage */ +typedef MatCapsule_ MatCapsule; + +/** \brief Abstract base class for handling custom pkQueueTS::Push event. + * + * It provides an abstract interface to create custom OnPush event handler. + * The OnPush event could be used to perform any custom actions like memory analysis on the pkQueueTS. + * + * ### How to use the OnPush event + * Because this class is pure virtual, you have to derive your own class and write your own OnPush(). + * The "OnPush" constructor pkQueueTS::pkQueueTS(pkQueueOnPushBase* onPushHandler) should + * be used + * + * In this case, pkQueueTS::Push() calls `YourDerivedClass::OnPush()` within its own lock. + */ +class pkQueueOnPushBase +{ +public: + /** \brief OnPush event handler interface + * + * User has to defines its own class and implements the method to take needed actions. + * + * \param queueSize Queue size after current push. + * \param elementPtr Pointer to the last pushed element in the queue <`&m_queue.back()`> + * \note This is called by pkQueueTS::Push() within a proper lock on the queue. + * Therefore the OnPush() method is thread safe vs those threads that are writing /reading + * the queue, also in case of multiple producer/consumer. + * \warning Do not use `elementPtr` out of here because pointed to object will be destroyed + * on next pkQueueTS::Pop(). + */ + virtual void OnPush(size_t queueSize, const void *elementPtr) = 0; +}; + +/////////////////////////////////////////////////////////////////////////////// +/** \brief Thread Safe Queue +* +* \author +* pkQueueTS - http://pklab.net/index.php?id=395 +* Copyright (C) 2016 PkLab.net all rights reserved. +* +* This is a thread safe queue, multiple producer multiple consumer. +* It's based on [std::queue](http://en.cppreference.com/w/cpp/container/queue) with single lock. +* A [condition_variable](http://en.cppreference.com/w/cpp/thread/condition_variable) +* with time-out is used to wait for new data. +* +* ## A thread safe queue for OpenCV Mats +* +* This class can be used to manage queue of [OpenCV Mats](http://docs.opencv.org/master/d3/d63/classcv_1_1Mat.html). +* Thanks to [Automatic Memory Management](http://docs.opencv.org/master/d1/dfb/intro.html) of `cv::Mat` +* and C++/STL memory recycling, use of `std::queue` with `cv::Mat` is memory effective. +* +* Unfortunately `cv::Mat` can't be used directly as element because its assignment operator and copy constructor +* just copy the Mat header but not the matrix data. A simple solution is to encapsulates `cv::Mat` +* into a structure or class and to write adequate assignment and constructor. The template class +* MatCapsule_ (or its simple alias `MatCapsule` ) is provided here to this purpose. +* +* Definitely you can write: +* +* \snippet pkQueueTS_TestSimple.cpp pkQueueTS_BasicUsage +* +* ## Additional storage +* +* The template class MatCapsule_ allows to attach any kind of additional storage to the capsule. +* For example you can have: +* +* \snippet pkQueueTS_TestSimple.cpp pkQueueTS_Additional_storage +* +* ## Memory statistics +* +* You can easily measure memory usage and allocations performed while using this class. +* Just write your own `OnPush` event handler and collect wanted measurement (see pkQueueOnPushBase). +* +* For example you can collect all addresses of pushed elements using a simple `std::map` as below: +* +* \snippet pkQueueTS_TestSimple.cpp pkQueueTS_Stats +* +* Please see http://pklab.net/?&id=396 for a detailed example and analysis +*/ +template +class pkQueueTS +{ +public: + + /** \brief Default constructor + */ + pkQueueTS():m_onPushHandler(nullptr) { } + + /** \overload + * \brief "OnPush" Constructor + * + * If the "OnPush" constructor is used, a custom object will be invoked each time + * pkQueueTS::Push() is called. In other words you have an OnPush event handler + * that can be used to perform any custom actions on the queue. + * + * \param onPushHandler A pointer to custom object be derived from + * the abstract class pkQueueOnPushBase + */ + pkQueueTS(pkQueueOnPushBase* onPushHandler) + { + m_onPushHandler = onPushHandler; + } + + /** \brief Default destructor + * + * Calls element destructor for all elements in the queue (if any). + * \warning If element is a pointer, the pointed to objects are not destroyed. + */ + ~pkQueueTS() + { + lock_guard lock(m_mutex); + while (!m_queue.empty()) + m_queue.pop(); //if you are here than you are discarding some elements + } + + /** \brief Retrieves the oldest element in the queue, if any or if wait. + * + * Blocks the caller thread until a new element is available or after the specified + * time-out duration. When/If the element has been retrieved correctly is it's removed + * from the queue. + * + * \param [out] element If PK_QTS_OK is returned, contains the oldest element in the queue. + * \param timeoutMs You can specify a time (milliseconds) to wait for a valid elements + * - `timeoutMs = 0` [default] means no wait at all + * - `timeoutMs > 0` means wait for given time out + * \return pkQueueResults One of the following: + * - PK_QTS_OK The element has been retrieved correctly + * - PK_QTS_TIMEOUT The queue is empty but we wont wait + * - PK_QTS_EMPTY Time out has reached waiting a valid element + */ + pkQueueResults Pop(TElement &element, unsigned timeoutMs = 0) + { + unique_lock lock(m_mutex); + // wait for a data (ignoring spurious awakenings) + while (m_queue.empty()) + { + // if no data and won't wait; + if (timeoutMs == 0) + { + return PK_QTS_EMPTY; + } + // else wait data with time-out + else if (cv_status::timeout == m_dataReady.wait_for(lock, chrono::milliseconds(timeoutMs))) + { + // Time out while waiting a valid element + return PK_QTS_TIMEOUT; + } + } // data available + element = m_queue.front(); //calls TElement::=operator + m_queue.pop(); + return PK_QTS_OK; + } + + /** \brief Inserts an element at the end of the queue + * + * \param element The element to add. + * \return The queue size after that the element has been added + */ + size_t Push(const TElement &element) + { + unique_lock lock(m_mutex); + m_queue.push(element); //calls TElement::copy_constructor + + size_t N = m_queue.size(); + if (m_onPushHandler) + { + void * ptr = &(m_queue.back()); + m_onPushHandler->OnPush(N, ptr); + } + + // The notifying thread does not need to hold the lock on the + // same mutex as the one held by the waiting thread(s); + // in fact doing so is a pessimization + lock.unlock(); + m_dataReady.notify_one(); + return N; + } + + /** \brief Returns the size of the queue + */ + size_t Size() + { + lock_guard lock(m_mutex); + return m_queue.size(); + } + + /** \brief Test whether queue is empty + */ + bool Empty() const + { + lock_guard lock(m_mutex); + return m_queue.empty(); + } + +private: + std::queue m_queue; + std::condition_variable m_dataReady; + mutable std::mutex m_mutex; + pkQueueOnPushBase* m_onPushHandler; + +}; + +//! @} +#endif //PKQUEUETS_H diff --git a/src/track_targets.cpp b/src/track_targets.cpp index 6616df5..4c52daf 100644 --- a/src/track_targets.cpp +++ b/src/track_targets.cpp @@ -19,15 +19,21 @@ Run separately with: ./track_targets -d TAG36h11 /dev/video0 calibration.yml 0.2 #include #include #include +#include +#include #include #include #include #include #include + #include "args.hxx" +#include "pkQueueTS.hpp" + #include #include #include + #include "aruco/aruco.h" #include "aruco/timers.h" @@ -167,6 +173,83 @@ void changeActiveMarker(std::map> &marker_history_queu } } +// Setup our incoming threaded camera feed +struct TimeCapsuleVars +{ + uint64_t timeStamp = 0; + uint64_t frameNum = 0; +}; +typedef MatCapsule_ MatTimeCapsule; +pkQueueTS incomingQueue; +atomic grabState; + +// Initiate camera +cv::VideoCapture vreader; + +// Thread to control capture device and push frames onto the threadsafe queue +void incomingThread() +{ + // Enable capture input + grabState = true; + // Incoming frame + MatTimeCapsule iframe; + uint64_t frameNum = 0; + std::cout << "Starting incomingThread()" << std::endl; + + while (grabState) + { + // Read the next frame in from the camera and push it to back of queue + // std::cout << "debug:Pushing camera frame to queue" << std::endl; + vreader.grab(); + // Lodge clock for start of frame, after grabbing the frame but before decompressing/converting it. + // This is as close as we can get to the shutter time, for a better match to inertial frame. + frameNum++; + iframe.vars.frameNum = frameNum; + iframe.vars.timeStamp = CLOCK() * 1e+6; + // Now process the grabbed frame + vreader.retrieve(iframe.mat); + // Push the timeframe capsule onto the queue, ready for collection and processing + incomingQueue.Push(iframe); + + // If there is more than 1 frame in the queue, pop the rest to prevent buildup + while (incomingQueue.Size() > 1) + incomingQueue.Pop(iframe); + + } +} + +// Setup our outgoing threaded video stream +pkQueueTS< MatCapsule > outgoingQueue; +atomic pushState; +// Initiate stream +cv::VideoWriter vwriter; +// Thread to control writer device, read frames from the threadsafe queue and push into the video stream +void outgoingThread() +{ + // Enable stream output + pushState = true; + // Outgoing frame + MatCapsule oframe; + std::cout << "Starting outgoingThread()" << std::endl; + + while (pushState) + { + int qsize = outgoingQueue.Size(); + // Read frame in from the queue and push it to stream + if (qsize > 0) + { + std::cout << "debug:Pushing output frame to stream:" << qsize << std::endl; + // If the queue already has more than 1 frame then pop it, to prevent buildup + // We only want to send the latest frame + while (qsize > 1) + outgoingQueue.Pop(oframe); + // Pop the frame off the queue and push to stream + outgoingQueue.Pop(oframe); + vwriter.write(oframe.mat); + } + } +} + // main.. int main(int argc, char **argv) { @@ -174,6 +257,9 @@ int main(int argc, char **argv) std::cout.setf(std::ios::unitbuf); std::ios_base::sync_with_stdio(false); + // Make sure std::cout does not show scientific notation + std::cout.setf(ios::fixed); + // Setup arguments for parser args::ArgumentParser parser("Track fiducial markers and estimate pose, output translation vectors for vision_landing"); args::HelpFlag help(parser, "help", "Display this help menu", {'h', "help"}); @@ -222,10 +308,8 @@ int main(int argc, char **argv) return 1; } - // Setup core objects - aruco::CameraParameters CamParam; - Mat rawimage; - VideoCapture vreader(args::get(input)); + // Open camera + vreader.open( args::get(input) ); // Bail if camera can't be opened if (!vreader.isOpened()) @@ -264,6 +348,7 @@ int main(int argc, char **argv) // vreader.set(CAP_PROP_BUFFERSIZE, 0); // Doesn't work yet with V4L2 // Read and parse camera calibration data + aruco::CameraParameters CamParam; CamParam.readFromXMLFile(args::get(calibration)); if (!CamParam.isValid()) { @@ -272,6 +357,8 @@ int main(int argc, char **argv) } // Take a single image and resize calibration parameters based on input stream dimensions + // Note we read directly from vreader here because we haven't turned the reader thread on yet + Mat rawimage; vreader >> rawimage; CamParam.resize(rawimage.size()); @@ -281,25 +368,31 @@ int main(int argc, char **argv) const double fovy = 2 * atan(inputheight / (2 * CamParam.CameraMatrix.at(1, 1))) * (180.0 / pi); std::cout << "info:FoVx~" << fovx << ":FoVy~" << fovy << ":vWidth~" << inputwidth << ":vHeight~" << inputheight << std::endl; - // Create an output object, if output specified then setup the pipeline unless output is set to 'window' - VideoWriter writer; + // Turn on incoming thread + std::thread inThread(incomingThread); + MatTimeCapsule iframe; + + // If output specified then setup the pipeline unless output is set to 'window' if (output && args::get(output) != "window") { if (fourcc) { std::string _fourcc = args::get(fourcc); - writer.open(args::get(output), CV_FOURCC(_fourcc[0], _fourcc[1], _fourcc[2], _fourcc[3]), inputfps, cv::Size(inputwidth, inputheight), true); + vwriter.open(args::get(output), CV_FOURCC(_fourcc[0], _fourcc[1], _fourcc[2], _fourcc[3]), inputfps, cv::Size(inputwidth, inputheight), true); } else { - writer.open(args::get(output), 0, inputfps, cv::Size(inputwidth, inputheight), true); + vwriter.open(args::get(output), 0, inputfps, cv::Size(inputwidth, inputheight), true); } - if (!writer.isOpened()) + if (!vwriter.isOpened()) { std::cerr << "Error can't create video writer" << std::endl; return 1; } } + // Turn on writer thread + std::thread outThread(outgoingThread); + MatCapsule oframe; // Setup the marker detection double MarkerSize = args::get(markersize); @@ -307,7 +400,7 @@ int main(int argc, char **argv) MarkerDetector::Params &MParams = MDetector.getParameters(); MDetector.setDetectionMode(aruco::DM_VIDEO_FAST, 0.02); MParams.setAutoSizeSpeedUp(true,0.3); - MParams.maxThreads = -1; // -1 = all + MParams.maxThreads = 1; // -1 = all MParams.setCornerRefinementMethod(aruco::CORNER_SUBPIX); MParams.NAttemptsAutoThresFix = 3; // This is the number of random threshold iterations when no markers found @@ -322,14 +415,6 @@ int main(int argc, char **argv) // Start framecounter at 0 for fps tracking int frameno = 0; - // Setup stdin listener - /* - string incommand; - pollfd cinfd[1]; - cinfd[0].fd = fileno(stdin); - cinfd[0].events = POLLIN; - */ - // Create a map of marker sizes from 'sizemapping' config setting std::map markerSizes; std::stringstream ss(args::get(sizemapping)); @@ -387,52 +472,38 @@ int main(int argc, char **argv) break; } - // Listen for commands on stdin - // This doesn't work yet, it does weird things as soon as something comes in on stdin - /* - if (poll(cinfd, 1, 1000)) - { - cout << "INCOMING MESSAGE!:" << endl; - // getline(cin, incommand); - // cin >> incommand; - // cout << "MESSAGE RECEIVED!:" << incommand << endl; - stateflag = 1; - cin.clear(); - } - */ - // If tracking not active, skip if (!stateflag) { // Add a 1ms sleep to slow down the loop if nothing else is being done nanosleep((const struct timespec[]){{0, 10000000L}}, NULL); - // If camera is started, stop and release it - /* - https://github.com/fnoop/vision_landing/issues/45 - if (vreader.isOpened()) - vreader.release(); - */ continue; } - // Start a Timer + // Start Timers + double framestart = CLOCK(); ScopedTimerEvents Timer("detectloop"); - // Copy image from input stream to cv matrix - vreader.grab(); - Timer.add("GrabImage"); - // Lodge clock for start of frame, after grabbing the frame but before decompressing/converting it. - // This is as close as we can get to the shutter time, to match to inertial frame. - double framestart = CLOCK(); - vreader.retrieve(rawimage); - Timer.add("DecodeImage"); - + // Read a camera frame from the incoming thread, with 1 second timeout + // std::cout << "debug:Incoming queue size:" << incomingQueue.Size() << std::endl; + pkQueueResults res = incomingQueue.Pop(iframe, 1000); + if (res != PK_QTS_OK) + { + if (verbose && res == PK_QTS_TIMEOUT) + std::cout << "debug:Time out reading from the camera thread" << std::endl; + if (verbose && res == PK_QTS_EMPTY) + std::cout << "debug:Camera thread is empty" << std::endl; + this_thread::yield(); + continue; + } + Timer.add("PopImage"); + // Skip this loop iteration if the frame was empty - if (rawimage.empty()) + if (iframe.mat.empty()) continue; // Detect markers - std::vector Markers = MDetector.detect(rawimage); + std::vector Markers = MDetector.detect(iframe.mat); Timer.add("MarkerDetect"); // Order the markers in ascending size - we want to start with the smallest. @@ -482,10 +553,10 @@ int main(int argc, char **argv) { if (active_marker == thisId) break; // Don't change to the same thing - std::cout << "debug:changing active_marker:" << thisId << ":" << _histsum << ":" << _histthresh << ":" << std::endl; changeActiveMarker(marker_history_queue, active_marker, thisId, marker_history); if (verbose) { + std::cout << "debug:changing active_marker:" << thisId << ":" << _histsum << ":" << _histthresh << ":" << std::endl; std::cout << "debug:marker history:"; for (auto &markerhist : marker_history_queue) { @@ -509,7 +580,8 @@ int main(int argc, char **argv) float _histthresh = marker_history * ((float)marker_threshold / (float)100); if (_histsum > _histthresh) { - std::cout << "debug:changing active_marker:" << thisId << std::endl; + if (verbose) + std::cout << "debug:changing active_marker:" << thisId << std::endl; changeActiveMarker(marker_history_queue, active_marker, thisId, marker_history); break; } @@ -532,7 +604,8 @@ int main(int argc, char **argv) else if (MarkerSize) { _size = MarkerSize; - std::cout << "debug:defaulting to generic marker size: " << markerArea.second << std::endl; + if (verbose) + std::cout << "debug:defaulting to generic marker size: " << markerArea.second << std::endl; } // Find the Marker in the Markers map and do pose estimation. I'm sure there's a better way of iterating through the map.. for (unsigned int i = 0; i < Markers.size(); i++) @@ -554,7 +627,7 @@ int main(int argc, char **argv) { if (output) { - Markers[i].draw(rawimage, Scalar(0, 255, 0), 2, false); + Markers[i].draw(iframe.mat, Scalar(0, 255, 0), 2, false); } // If pose estimation was successful, calculate data and output to anyone listening. if (Markers[i].Tvec.at(0, 2) > 0) @@ -568,16 +641,19 @@ int main(int argc, char **argv) { Ti = MParams.minSize; ThresHold = MParams.ThresHold; - std::cout << "debug:active_marker:" << active_marker << ":center~" << Markers[i].getCenter() << ":area~" << Markers[i].getArea() << ":marker~" << Markers[i] << ":vectornorm~" << distance << ":Ti~" << Ti << ":ThresHold~" << ThresHold << ":Markers~" << Markers.size() << ":processtime~" << processtime << std::endl; + std::cout << "debug:active_marker:" << active_marker << ":center~" << Markers[i].getCenter() << ":area~" << Markers[i].getArea() << ":marker~" << Markers[i] << ":vectornorm~" << distance << ":Ti~" << Ti << ":ThresHold~" << ThresHold << ":Markers~" << Markers.size() << ":processtime~" << processtime << ":timestamp~" << iframe.vars.timeStamp << std::endl; } - std::cout << "target:" << Markers[i].id << ":" << xoffset << ":" << yoffset << ":" << distance << ":" << processtime << std::endl; + // This is the main message that track_targets outputs, containing the active target data + std::cout << "target:" << Markers[i].id << ":" << xoffset << ":" << yoffset << ":" << distance << ":" << processtime << ":" << iframe.vars.timeStamp << ":" << CLOCK() * 1e+6 << std::endl << std::flush; + // std::cout << "target:" << Markers[i].id << ":" << xoffset << ":" << yoffset << ":" << distance << ":" << processtime << ":" << CLOCK() * 1e+6 << std::endl << std::flush; + std::fflush(stdout); // explicitly flush stdout buffer Timer.add("SendMessage"); // Draw AR cube and distance if (output) { // don't burn cpu cycles if no output - drawARLandingCube(rawimage, Markers[i], CamParam); - CvDrawingUtils::draw3dAxis(rawimage, Markers[i], CamParam); - drawVectors(rawimage, Scalar(0, 255, 0), 1, (i + 1) * 20, Markers[i].id, xoffset, yoffset, distance, Markers[i].getCenter().x, Markers[i].getCenter().y); + drawARLandingCube(iframe.mat, Markers[i], CamParam); + CvDrawingUtils::draw3dAxis(iframe.mat, Markers[i], CamParam); + drawVectors(iframe.mat, Scalar(0, 255, 0), 1, (i + 1) * 20, Markers[i].id, xoffset, yoffset, distance, Markers[i].getCenter().x, Markers[i].getCenter().y); Timer.add("DrawGreenAR"); } } @@ -585,33 +661,38 @@ int main(int argc, char **argv) } else { - std::cout << "debug:inactive_marker:center~" << Markers[i].getCenter() << ":area~" << Markers[i].getArea() << ":marker~" << Markers[i] << ":Ti~" << Ti << ":ThresHold~" << ThresHold << ":Markers~" << Markers.size() << ":processtime~" << processtime << std::endl; + if (verbose) + std::cout << "debug:inactive_marker:center~" << Markers[i].getCenter() << ":area~" << Markers[i].getArea() << ":marker~" << Markers[i] << ":Ti~" << Ti << ":ThresHold~" << ThresHold << ":Markers~" << Markers.size() << ":processtime~" << processtime << std::endl; if (output) { // don't burn cpu cycles if no output - Markers[i].draw(rawimage, Scalar(0, 0, 255), 2, false); - drawVectors(rawimage, Scalar(0, 0, 255), 1, (i + 1) * 20, Markers[i].id, 0, 0, Markers[i].Tvec.at(0, 2), Markers[i].getCenter().x, Markers[i].getCenter().y); + Markers[i].draw(iframe.mat, Scalar(0, 0, 255), 2, false); + drawVectors(iframe.mat, Scalar(0, 0, 255), 1, (i + 1) * 20, Markers[i].id, 0, 0, Markers[i].Tvec.at(0, 2), Markers[i].getCenter().x, Markers[i].getCenter().y); Timer.add("DrawRedAR"); } } } + Timer.add("UseMarkers"); if (output && args::get(output) != "window") { - writer << rawimage; - Timer.add("OutputImage"); + oframe.mat = iframe.mat.clone(); + // iframe.mat.copyTo(oframe.mat); + Timer.add("Out-CloneMat"); + outgoingQueue.Push(oframe); + Timer.add("Out-PushImage"); } else if (output && args::get(output) == "window") { - imshow("vision_landing", rawimage); + imshow("vision_landing", iframe.mat); Timer.add("OutputImage"); } // Lodge clock for end of frame double framedur = CLOCK() - framestart; - // Print fps info every 100 frames if in debug mode + // Print fps info every 100 frames char framestr[100]; - sprintf(framestr, "debug:avgframedur~%fms:fps~%f:frameno~%d:", avgdur(framedur), avgfps(), frameno++); + sprintf(framestr, "info:avgframedur~%fms:fps~%f:frameno~%d:", avgdur(framedur), avgfps(), frameno++); if (verbose && (frameno % 100 == 1)) { std::cout << framestr << std::endl; @@ -619,7 +700,16 @@ int main(int argc, char **argv) Timer.add("EndofLoop"); } + // Stop incoming thread and flush queue std::cout << "info:track_targets complete, exiting" << std::endl; + grabState = false; + pushState = false; + while (PK_QTS_EMPTY != incomingQueue.Pop(iframe, 0)); + inThread.join(); + while (PK_QTS_EMPTY != outgoingQueue.Pop(oframe, 0)); + outThread.join(); + vreader.release(); + vwriter.release(); std::cout.flush(); return 0; } diff --git a/vision_landing b/vision_landing index 7abcdca..6ad5a56 100755 --- a/vision_landing +++ b/vision_landing @@ -9,7 +9,7 @@ # It launches a separate program track_targets which does the actual cv work and captures the resulting vectors from non-blocking thread handler. from threading import Thread, Event -from Queue import Queue, Empty +from Queue import LifoQueue, Queue, Empty from subprocess import Popen, PIPE from dronekit import connect, VehicleMode, LocationGlobal, LocationGlobalRelative from pymavlink import mavutil @@ -23,6 +23,7 @@ from collections import deque from math import sqrt import signal import logging +import ctypes ### ================================ ### Define Classes @@ -34,11 +35,14 @@ class ThreadedReader: def __init__(self, stdout, stderr): self.stdout = stdout self.stderr = stderr - self.queue = Queue() + self.queue = LifoQueue() self._stop = Event() def insertQueue(pipe, queue, qtype="stdout"): while not self.stopped(): - line = pipe.readline().rstrip() + self.clear() # Clear the queue before adding anything - we only want the latest data + line = pipe.readline().rstrip() + ":{:.0f}".format(monotonic_time()) + if args.verbose: + log.debug("Adding line: {} : {}".format(qtype, line)) if line and qtype == "stdout": queue.put(line) elif line and qtype == "stderr": @@ -49,14 +53,25 @@ class ThreadedReader: self.threaderr = Thread(target = insertQueue, args = (self.stderr, self.queue, "stderr")) self.threaderr.daemon = True self.threaderr.start() + def size(self): + return self.queue.qsize() def readline(self, timeout = None): try: - return self.queue.get(block = timeout is not None, timeout = timeout) + line = self.queue.get(block = timeout is not None, timeout = timeout) + self.queue.task_done() + return line except Empty: + log.debug("empty queue") return None def clear(self): - with self.queue.mutex: - self.queue.queue.clear() + #with self.queue.mutex: + # self.queue.clear() + while self.size() > 10: + # self.readline() + tmpline = self.queue.get(False) + log.debug("clearing queue: {} : {}".format(self.size(), tmpline)) + self.queue.task_done() + return def stop(self): self._stop.set() def stopped(self): @@ -76,16 +91,18 @@ class TrackTime: self.difference_buffer = deque(maxlen=50) # Circular buffer to track difference between two systems self.debug = False self.vehicle = craft.vehicle + # Setup time struct to hold local monotonic clock queries + self.t = timespec() # Setup listener decorator @self.vehicle.on_message("TIMESYNC") def listener_timesync(subself, name, message): try: if message.ts1 < self.mytime_nanos: - log.info("Newer timesync message already received, ignoring") + #log.info("Newer timesync message already received, ignoring") return False self.mytime_nanos = message.ts1 # Set the time that the message was sent _cts = self.current_timestamp() - _avg_ts = (self.local_tracker[message.ts1] + _cts)/2; + _avg_ts = (self.local_tracker[message.ts1] + _cts)/2 self._delta = _cts - _avg_ts # Calculuate the time delta self.delta_buffer.append(self._delta) # Push the delta into the circular buffer # If the averaged delta has been reached, use that, and update the delta from the latest buffer @@ -97,7 +114,6 @@ class TrackTime: self.fctime_nanos = message.tc1 - self._delta self.difference_buffer.append(self.mytime_nanos - self.fctime_nanos) self.difference = sum(list(self.difference_buffer)) / len(list(self.difference_buffer)) - # print "difference_buffer:",self.difference_buffer if self.debug: log.debug("Timesync message received:"+", ".join([str(message), str(self._delta), str(self.fctime_nanos)])) except: @@ -113,9 +129,11 @@ class TrackTime: # When full, average the delta buffer and then set that to use as the ongoing delta if len(self.delta_buffer) == 50: self.delta = sum(list(self.delta_buffer)) / len(list(self.delta_buffer)) - # Method to return local UCT time in nanoseconds since epoch. Precision depends on platform. + else: + log.warning("Error creating timesync delta buffer - unpredictable results will follow") + # Method to return monotonic clock time def current_timestamp(self): - return int((datetime.utcnow() - datetime(1970, 1, 1)).total_seconds() * 1000000000); + return monotonic_time() # Request a timesync update from the flight controller def update(self, ts=0, tc=0): if ts == 0: @@ -133,9 +151,11 @@ class TrackTime: # Return actual + time elapsed since actual set def estimate(self): estmynanos = self.current_timestamp() - self.difference - estfcnanos = self.fctime_nanos + (self.current_timestamp() - self.mytime_nanos) + estfcnanos = self.actual() + (self.current_timestamp() - self.mytime_nanos) #log.debug("Estimate time: current-difference:"+str(estmynanos/1000000)+", fctime_nanos+elapsed:"+str(estfcnanos/1000000)+", difference:"+str((estmynanos - estfcnanos) / 1000000)) return estmynanos + def toFcTime(self, inTime): + return inTime - self.difference # TrackTargets class launches, controls and handles the separate track_targets process that does the actual vision processing. class TrackTargets: @@ -214,24 +234,30 @@ class TrackTargets: self.state = "initialised" return # Unpack and cast target data to correct types - [id,x,y,z,processtime] = trackdata[1:] - [id,x,y,z,processtime] = [int(id),float(x),float(y),float(z),float(processtime)] + try: + [id,x,y,z,processtime,sensorTimestamp,sendTimestamp,receiveTimestamp] = trackdata[1:] + [id,x,y,z,processtime,sensorTimestamp,sendTimestamp,receiveTimestamp] = [int(id),float(x),float(y),float(z),float(processtime),float(sensorTimestamp),float(sendTimestamp),float(receiveTimestamp)] + except: + log.debug("track_targets: error in data: " + str(line.rstrip())) + return # If fake rangefinder distance is required, send it if args.fakerangefinder: craft.send_distance_message(int(z*100)) # Send the landing_target message to arducopter # NOTE: This sends the current estimated fctime, NOT the time the frame was actually captured. # So this does not allow for the CV processing time. This should be sending when the frame is actually captured, before processing. - craft.send_land_message(x,y,z, tracktime.estimate()/1000) # Send in micros, not nanos - # Track frame timing to calculate fps, running average over last 5 frames + nowTime = monotonic_time() + # If data is less than 250ms since the image shuttr, send the message. + if nowTime - sensorTimestamp < 250 * 1e+6: + craft.send_land_message(x, y, z, tracktime.toFcTime(sensorTimestamp) / 1000, id) # Send sensor timestamp adjusted to timesync, in micros + # Track frame timing to calculate fps, running average over last 10 frames end_t = time() time_taken = end_t - self.start_t self.start_t = end_t self.frame_times.append(time_taken) - self.frame_times = self.frame_times[-5:] + self.frame_times = self.frame_times[-10:] fps = len(self.frame_times) / sum(self.frame_times) - # log.info("Fctime:"+str(tracktime.estimate())+", Fps: "+str(fps)+", Alt:"+ str(craft.vehicle.location.global_relative_frame.alt)+", Rangefinder:"+ str(craft.vehicle.rangefinder.distance)+ ", Marker:"+ str(id)+ ", Distance:"+str(z)+ "xOffset:"+str(x)+ "yOffset:"+str(y)) - log.info("Fctime:"+str(tracktime.actual())+", Fps: "+str(fps)+", Alt:"+ str(craft.vehicle.location.global_relative_frame.alt)+", Rangefinder:"+ str(craft.vehicle.rangefinder.distance)+ ", Marker:"+ str(id)+ ", Distance:"+str(z)+ ", xOffset:"+str(x)+ ", yOffset:"+str(y) + ", processTime:"+str(processtime)) + log.info("Fctime:{:.0f}:{:.0f}, Fps:{:.2f}, Alt:{}, Rng:{}, Marker:{}, Distance:{}, xOffset:{}, yOffset:{}, processTime:{:.0f}, sensorTime:{:.0f}, sendTime:{:.0f}, receiveTime:{:.0f}, currentTime:{:.0f}, sensorDiff:{}, sendDiff:{}, receiveDiff:{}".format(tracktime.actual(), tracktime.estimate(), fps, craft.vehicle.location.global_relative_frame.alt, craft.vehicle.rangefinder.distance, id, z, x, y, processtime, sensorTimestamp, sendTimestamp, receiveTimestamp, nowTime, nowTime - sensorTimestamp, nowTime - sendTimestamp, nowTime - receiveTimestamp)) def start(self): if self.state == "initialised": log.info("Requesting track_targets to start tracking:"+str(craft.vehicle.mode)) @@ -239,8 +265,6 @@ class TrackTargets: self.state = "started" else: log.info("Request to start track_targets tracking failed, state is not 'initialised'") - # self.state = "started" - # self.process.stdin.write("BLAAAAAAH!\n") # This doesn't work yet def stop(self): log.info("Requesting track_targets to stop tracking:"+str(craft.vehicle.mode)) self.state = "initialised" @@ -295,21 +319,27 @@ class Craft: log.debug("Sending mavlink distance_message:" +str(dist)) # Define function to send landing_target mavlink message for mavlink based precision landing # http://mavlink.org/messages/common#LANDING_TARGET - def send_land_message(self, x,y,z, time_usec=0): + def send_land_message(self, x,y,z, time_usec=0, target_num=0): msg = self.vehicle.message_factory.landing_target_encode( - time_usec, # time since system boot, not used - 0, # target num, not used + time_usec, # time target data was processed, as close to sensor capture as possible + target_num, # target num, not used mavutil.mavlink.MAV_FRAME_BODY_NED, # frame, not used x, # X-axis angular offset, in radians y, # Y-axis angular offset, in radians z, # distance, in meters 0, # Target x-axis size, in radians - 0 # Target y-axis size, in radians + 0, # Target y-axis size, in radians + 0, # x float X Position of the landing target on MAV_FRAME + 0, # y float Y Position of the landing target on MAV_FRAME + 0, # z float Z Position of the landing target on MAV_FRAME + (1,0,0,0), # q float[4] Quaternion of landing target orientation (w, x, y, z order, zero-rotation is 1, 0, 0, 0) + 2, # type of landing target: 2 = Fiducial marker + 1, # position_valid boolean ) self.vehicle.send_mavlink(msg) self.vehicle.flush() if args.verbose: - log.debug("Sending mavlink landing_target:"+str(x)+" "+str(y)+" "+str(z)) + log.debug("Sending mavlink landing_target - time_usec:{:.0f}, x:{}, y:{}, z:{}".format(time_usec, str(x), str(y), str(z))) # Define function that arms and takes off, used for testing in SITL # Lifted from dronekit-python examples def arm_and_takeoff(self, targetAltitude): @@ -527,6 +557,25 @@ sigtracker = SigTrack() signal.signal(signal.SIGINT, sigtracker.handle_sig) signal.signal(signal.SIGTERM, sigtracker.handle_sig) +# Setup monotonic clock +CLOCK_MONOTONIC_RAW = 4 +class timespec(ctypes.Structure): + _fields_ = [ + ('tv_sec', ctypes.c_long), + ('tv_nsec', ctypes.c_long) + ] +librt = ctypes.CDLL('librt.so.1', use_errno=True) +clock_gettime = librt.clock_gettime +clock_gettime.argtypes = [ctypes.c_int, ctypes.POINTER(timespec)] + +def monotonic_time(): + t = timespec() + if clock_gettime(CLOCK_MONOTONIC_RAW , ctypes.pointer(t)) != 0: + errno_ = ctypes.get_errno() + raise OSError(errno_, os.strerror(errno_)) + return (t.tv_sec * 1e+9) + t.tv_nsec + + # Connect to the Vehicle log.info("Connecting to drone on: %s" % args.connect) craft = Craft(args.connect) @@ -546,7 +595,7 @@ tracktime.debug = False while True: tracktime.initial_sync() if not tracktime.delta: - log.warn("Timesync not supported by flight controller. THIS CAN LEAD TO DANGEROUS AND UNPREDICTABLE BEHAVIOUR.") + log.warning("Timesync not supported by flight controller. THIS CAN LEAD TO DANGEROUS AND UNPREDICTABLE BEHAVIOUR.") break log.info("Initial timesync complete, offset is {} ms".format(tracktime.delta/1000000)) log.info("Initial timesync difference is {} ms".format(tracktime.difference/1000000)) @@ -569,13 +618,13 @@ while True: # Set some parameters important to precision landing log.info("Setting flight controller parameters") -craft.vehicle.parameters['PLND_ENABLED'] = 1 -craft.vehicle.parameters['PLND_TYPE'] = 1 # Mavlink landing mode +craft.vehicle.parameters['PLND_ENABLED'] = 1 +craft.vehicle.parameters['PLND_TYPE'] = 1 # Mavlink landing backend # Set estimator type to 'raw', as ekf estimator can be very erratic try: if 'PLND_EST_TYPE' in craft.vehicle.parameters: # TODO: Set a config parameter for estimator type - craft.vehicle.parameters['PLND_EST_TYPE'] = 0 # https://github.com/ArduPilot/ardupilot/pull/5491, not yet in arducopter + craft.vehicle.parameters['PLND_EST_TYPE'] = 0 except: pass # Set the precland buffer to 250ms. This should give enough buffer space for even very slow computers @@ -604,7 +653,7 @@ if args.simulator: craft.vehicle.parameters['RNGFND_SCALING'] = 12.12 # Take off to 25m altitude and start landing if not already armed if not craft.vehicle.armed: - craft.arm_and_takeoff(25) + craft.arm_and_takeoff(100) # Start landing log.info("Starting landing...") craft.vehicle.mode = VehicleMode("LAND") @@ -643,12 +692,18 @@ while True: track_targets.start() # See if there are any tracking results in the queue - line = track_targets.reader.readline() + #log.debug("Main loop: Before readline: {:.0f}".format(monotonic_time())) + line = track_targets.reader.readline(1) + #log.debug("Main loop: After readline: {:.0f}".format(monotonic_time())) if line: + if args.verbose: + log.debug("queue size:{}, ourtime:{:.0f}, line:{}".format(track_targets.reader.size(), monotonic_time(), line)) track_targets.processline(line) + #log.debug("Main loop: After processline: {:.0f}".format(monotonic_time())) else: - # Add a short 1s sleep to slow down the loop, otherwise it consumes 100% cpu. - sleep(1) + # Add a short 100ms sleep to slow down the loop, otherwise it consumes 100% cpu. + log.debug("Main loop queue read empty, sleeping for a few ms") + sleep(0.1) if args.simulator and not craft.vehicle.armed and not track_targets.shutdown: log.info("Landed")