Skip to content

Commit

Permalink
sched/mqueue: add support of tick based send() and receive()
Browse files Browse the repository at this point in the history
Add tick based send()/receive() to reduce the cost of time conversion

Signed-off-by: chao an <anchao@lixiang.com>
  • Loading branch information
anchao committed May 10, 2024
1 parent 0ed714b commit 991332d
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 65 deletions.
84 changes: 84 additions & 0 deletions include/nuttx/mqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,52 @@ int file_mq_timedsend(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio,
FAR const struct timespec *abstime);

/****************************************************************************
* Name: file_mq_ticksend
*
* Description:
* This function adds the specified message (msg) to the message queue
* (mq). file_mq_ticksend() behaves just like mq_send(), except that if
* the queue is full and the O_NONBLOCK flag is not enabled for the
* message queue description, then abstime points to a structure which
* specifies a ceiling on the time for which the call will block.
*
* file_mq_ticksend() is functionally equivalent to mq_timedsend() except
* that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedsend() for a more complete description of the
* behavior of this function
*
* Input Parameters:
* mq - Message queue descriptor
* msg - Message to send
* msglen - The length of the message in bytes
* prio - The priority of the message
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedsend() for the list list valid return values).
*
* EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
* message queue description referred to by mq.
* EINVAL Either msg or mq is NULL or the value of prio is invalid.
* EBADF Message queue opened not opened for writing.
* EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
* message queue.
* EINTR The call was interrupted by a signal handler.
*
****************************************************************************/

int file_mq_ticksend(FAR struct file *mq, FAR const char *msg,
size_t msglen, unsigned int prio, sclock_t ticks);

/****************************************************************************
* Name: file_mq_receive
*
Expand Down Expand Up @@ -656,6 +702,44 @@ ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
FAR const struct timespec *abstime);

/****************************************************************************
* Name: file_mq_tickreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mq." If the message queue is empty
* and O_NONBLOCK was not set, file_mq_tickreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* file_mq_tickreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/

ssize_t file_mq_tickreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
sclock_t ticks);

/****************************************************************************
* Name: file_mq_setattr
*
Expand Down
135 changes: 108 additions & 27 deletions sched/mqueue/mq_timedreceive.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,29 +102,22 @@ static void nxmq_rcvtimeout(wdparm_t pid)
****************************************************************************/

/****************************************************************************
* Name: file_mq_timedreceive
* Name: file_mq_timedreceive_internal
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mq." If the message queue is empty
* and O_NONBLOCK was not set, file_mq_timedreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* file_mq_timedreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
* This is an internal function of file_mq_timedreceive()/
* file_mq_tickreceive(), please refer to the detailed description for
* more information.
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
* isabs - Flag to wait delay from absolute time or relative tick.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
Expand All @@ -134,9 +127,11 @@ static void nxmq_rcvtimeout(wdparm_t pid)
*
****************************************************************************/

ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
FAR const struct timespec *abstime)
static ssize_t
file_mq_timedreceive_internal(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
FAR const struct timespec *abstime,
sclock_t ticks, bool isabs)
{
FAR struct tcb_s *rtcb = this_task();
FAR struct mqueue_inode_s *msgq;
Expand All @@ -156,7 +151,8 @@ ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
return ret;
}

if (!abstime || abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)
if (isabs &&
(!abstime || abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000))
{
return -EINVAL;
}
Expand All @@ -175,28 +171,29 @@ ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,

if (list_is_empty(&msgq->msglist))
{
sclock_t ticks;

/* Convert the timespec to clock ticks. We must have interrupts
* disabled here so that this time stays valid until the wait begins.
*/
if (isabs)
{
/* Convert the timespec to clock ticks.
* We must have interrupts disabled here so that
* this time stays valid until the wait begins.
*/

ret = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
ret = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
}

/* If the time has already expired and the message queue is empty,
* return immediately.
*/

if (ret == OK && ticks <= 0)
{
ret = ETIMEDOUT;
ret = -ETIMEDOUT;
}

/* Handle any time-related errors */

if (ret != OK)
{
ret = -ret;
goto errout_in_critical_section;
}

Expand Down Expand Up @@ -238,6 +235,89 @@ ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
return ret;
}

/****************************************************************************
* Name: file_mq_timedreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mq." If the message queue is empty
* and O_NONBLOCK was not set, file_mq_timedreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* file_mq_timedreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* abstime - the absolute time to wait until a timeout is declared.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/

ssize_t file_mq_timedreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
FAR const struct timespec *abstime)
{
return file_mq_timedreceive_internal(mq, msg, msglen,
prio, abstime, 0, true);
}

/****************************************************************************
* Name: file_mq_tickreceive
*
* Description:
* This function receives the oldest of the highest priority messages from
* the message queue specified by "mq." If the message queue is empty
* and O_NONBLOCK was not set, file_mq_tickreceive() will block until a
* message is added to the message queue (or until a timeout occurs).
*
* file_mq_tickreceive() is an internal OS interface. It is functionally
* equivalent to mq_timedreceive() except that:
*
* - It is not a cancellation point, and
* - It does not modify the errno value.
*
* See comments with mq_timedreceive() for a more complete description of
* the behavior of this function
*
* Input Parameters:
* mq - Message Queue Descriptor
* msg - Buffer to receive the message
* msglen - Size of the buffer in bytes
* prio - If not NULL, the location to store message priority.
* ticks - Ticks to wait from the start time until the semaphore is
* posted.
*
* Returned Value:
* This is an internal OS interface and should not be used by applications.
* It follows the NuttX internal error return policy: Zero (OK) is
* returned on success. A negated errno value is returned on failure.
* (see mq_timedreceive() for the list list valid return values).
*
****************************************************************************/

ssize_t file_mq_tickreceive(FAR struct file *mq, FAR char *msg,
size_t msglen, FAR unsigned int *prio,
sclock_t ticks)
{
return file_mq_timedreceive_internal(mq, msg, msglen,
prio, NULL, ticks, false);
}

/****************************************************************************
* Name: nxmq_timedreceive
*
Expand Down Expand Up @@ -284,7 +364,8 @@ ssize_t nxmq_timedreceive(mqd_t mqdes, FAR char *msg, size_t msglen,
return ret;
}

return file_mq_timedreceive(filep, msg, msglen, prio, abstime);
return file_mq_timedreceive_internal(filep, msg, msglen,
prio, abstime, 0, true);
}

/****************************************************************************
Expand Down
Loading

0 comments on commit 991332d

Please sign in to comment.