Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: Dedicated event signalling for each thread #46

Merged
merged 2 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ void loop()
Serial.block();
Serial.println("Thread #0: Lorem ipsum ...");
Serial.unblock();

/* If we don't hand back control then the main thread
* will hog the CPU and all other thread's won't get
* time to be executed.
*/
rtos::ThisThread::yield();
}

/**************************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,5 @@ void setup()

void loop()
{
/* If we don't hand back control then the main thread
* will hog the CPU and all other thread's won't get
* time to be executed.
*/
rtos::ThisThread::yield();

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,4 @@ void loop()
Serial.println();
Serial.unblock();
}

/* If we don't hand back control then the main thread
* will hog the CPU and all other thread's won't get
* time to be executed.
*/
rtos::ThisThread::yield();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,4 @@ void loop()
Serial.print("] Thread #0: Lorem ipsum ...");
Serial.println();
Serial.unblock();

/* If we don't hand back control then the main thread
* will hog the CPU and all other thread's won't get
* time to be executed.
*/
rtos::ThisThread::yield();
}
59 changes: 29 additions & 30 deletions src/io/serial/SerialDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
SerialDispatcher::SerialDispatcher(arduino::HardwareSerial & serial)
: _is_initialized{false}
, _mutex{}
, _cond{_mutex}
, _serial{serial}
, _thread(osPriorityRealtime, 4096, nullptr, "SerialDispatcher")
, _has_tread_started{false}
Expand Down Expand Up @@ -68,11 +67,8 @@ void SerialDispatcher::begin(unsigned long baudrate, uint16_t config)
/* Since the thread is not in the list yet we are
* going to create a new entry to the list.
*/
ThreadCustomerData data;
data.thread_id = current_thread_id;
data.block_tx_buffer = false;
data.prefix_func = nullptr;
data.suffix_func = nullptr;
uint32_t const thread_event_flag = (1<<(_thread_customer_list.size()));
ThreadCustomerData data{current_thread_id, thread_event_flag};
_thread_customer_list.push_back(data);
}
}
Expand All @@ -87,7 +83,10 @@ void SerialDispatcher::end()
osThreadId_t const current_thread_id = rtos::ThisThread::get_id();
std::remove_if(std::begin(_thread_customer_list),
std::end (_thread_customer_list),
[current_thread_id](ThreadCustomerData const d) -> bool { return (d.thread_id == current_thread_id); });
[current_thread_id](ThreadCustomerData const d) -> bool
{
return (d.thread_id == current_thread_id);
});

/* If no thread consumers are left also end
* the serial device altogether.
Expand All @@ -104,7 +103,7 @@ int SerialDispatcher::available()
{
mbed::ScopedLock<rtos::Mutex> lock(_mutex);
auto iter = findThreadCustomerDataById(rtos::ThisThread::get_id());
if (iter == std::end(_thread_customer_list)) return 0;
assert(iter != std::end(_thread_customer_list));

prepareSerialReader(iter);
handleSerialReader();
Expand All @@ -116,7 +115,7 @@ int SerialDispatcher::peek()
{
mbed::ScopedLock<rtos::Mutex> lock(_mutex);
auto iter = findThreadCustomerDataById(rtos::ThisThread::get_id());
if (iter == std::end(_thread_customer_list)) return 0;
assert(iter != std::end(_thread_customer_list));

prepareSerialReader(iter);
handleSerialReader();
Expand All @@ -128,7 +127,7 @@ int SerialDispatcher::read()
{
mbed::ScopedLock<rtos::Mutex> lock(_mutex);
auto iter = findThreadCustomerDataById(rtos::ThisThread::get_id());
if (iter == std::end(_thread_customer_list)) return 0;
assert(iter != std::end(_thread_customer_list));

prepareSerialReader(iter);
handleSerialReader();
Expand All @@ -150,14 +149,8 @@ size_t SerialDispatcher::write(uint8_t const b)
size_t SerialDispatcher::write(const uint8_t * data, size_t len)
{
mbed::ScopedLock<rtos::Mutex> lock(_mutex);

auto iter = findThreadCustomerDataById(rtos::ThisThread::get_id());

/* If this thread hasn't registered yet
* with the SerialDispatcher via 'begin'.
*/
if (iter == std::end(_thread_customer_list))
return 0;
assert(iter != std::end(_thread_customer_list));

size_t bytes_written = 0;
for (; (bytes_written < len) && iter->tx_buffer.availableForStore(); bytes_written++)
Expand All @@ -166,7 +159,7 @@ size_t SerialDispatcher::write(const uint8_t * data, size_t len)
/* Inform the worker thread that new data has
* been written to a Serial transmit buffer.
*/
_cond.notify_one();
_data_available_for_transmit.set(iter->thread_event_flag);

return bytes_written;
}
Expand All @@ -175,32 +168,37 @@ void SerialDispatcher::block()
{
mbed::ScopedLock<rtos::Mutex> lock(_mutex);
auto iter = findThreadCustomerDataById(rtos::ThisThread::get_id());
if (iter == std::end(_thread_customer_list)) return;
assert(iter != std::end(_thread_customer_list));

iter->block_tx_buffer = true;
}

void SerialDispatcher::unblock()
{
mbed::ScopedLock<rtos::Mutex> lock(_mutex);
auto iter = findThreadCustomerDataById(rtos::ThisThread::get_id());
if (iter == std::end(_thread_customer_list)) return;
assert(iter != std::end(_thread_customer_list));

iter->block_tx_buffer = false;
_cond.notify_one();

_data_available_for_transmit.set(iter->thread_event_flag);
}

void SerialDispatcher::prefix(PrefixInjectorCallbackFunc func)
{
mbed::ScopedLock<rtos::Mutex> lock(_mutex);
auto iter = findThreadCustomerDataById(rtos::ThisThread::get_id());
if (iter == std::end(_thread_customer_list)) return;
assert(iter != std::end(_thread_customer_list));

iter->prefix_func = func;
}

void SerialDispatcher::suffix(SuffixInjectorCallbackFunc func)
{
mbed::ScopedLock<rtos::Mutex> lock(_mutex);
auto iter = findThreadCustomerDataById(rtos::ThisThread::get_id());
if (iter == std::end(_thread_customer_list)) return;
assert(iter != std::end(_thread_customer_list));

iter->suffix_func = func;
}

Expand All @@ -226,12 +224,10 @@ void SerialDispatcher::threadFunc()

while(!_terminate_thread)
{
/* Prevent race conditions by multi-threaded
* access to shared data.
*/
mbed::ScopedLock<rtos::Mutex> lock(_mutex);
/* Wait for new data to be available */
_cond.wait();
/* Wait for data to be available in a transmit buffer. */
static uint32_t constexpr ALL_EVENT_FLAGS = 0x7fffffff;
_data_available_for_transmit.wait_any(ALL_EVENT_FLAGS, osWaitForever, /* clear */ true);

/* Iterate over all list entries. */
std::for_each(std::begin(_thread_customer_list),
std::end (_thread_customer_list),
Expand Down Expand Up @@ -303,7 +299,10 @@ std::list<SerialDispatcher::ThreadCustomerData>::iterator SerialDispatcher::find
{
return std::find_if(std::begin(_thread_customer_list),
std::end (_thread_customer_list),
[thread_id](ThreadCustomerData const d) -> bool { return (d.thread_id == thread_id); });
[thread_id](ThreadCustomerData const d) -> bool
{
return (d.thread_id == thread_id);
});
}

void SerialDispatcher::prepareSerialReader(std::list<ThreadCustomerData>::iterator & iter)
Expand Down
18 changes: 15 additions & 3 deletions src/io/serial/SerialDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class SerialDispatcher : public arduino::HardwareSerial

bool _is_initialized;
rtos::Mutex _mutex;
rtos::ConditionVariable _cond;
rtos::EventFlags _data_available_for_transmit;
arduino::HardwareSerial & _serial;

rtos::Thread _thread;
Expand All @@ -84,15 +84,27 @@ class SerialDispatcher : public arduino::HardwareSerial
static int constexpr THREADSAFE_SERIAL_TRANSMIT_RINGBUFFER_SIZE = 128;
typedef arduino::RingBufferN<THREADSAFE_SERIAL_TRANSMIT_RINGBUFFER_SIZE> SerialTransmitRingbuffer;

typedef struct
class ThreadCustomerData
{
public:
ThreadCustomerData(osThreadId_t const t, uint32_t const t_event_flag)
: thread_id{t}
, thread_event_flag{t_event_flag}
, tx_buffer{}
, block_tx_buffer{false}
, rx_buffer{}
, prefix_func{nullptr}
, suffix_func{nullptr}
{ }

osThreadId_t thread_id;
uint32_t thread_event_flag;
SerialTransmitRingbuffer tx_buffer;
bool block_tx_buffer;
mbed::SharedPtr<arduino::RingBuffer> rx_buffer; /* Only when a thread has expressed interested to read from serial a receive ringbuffer is allocated. */
PrefixInjectorCallbackFunc prefix_func;
SuffixInjectorCallbackFunc suffix_func;
} ThreadCustomerData;
};

std::list<ThreadCustomerData> _thread_customer_list;

Expand Down