Skip to content

Commit ce2d52c

Browse files
dledfordtorvalds
authored andcommitted
ipc/mqueue: add rbtree node caching support
When I wrote the first patch that added the rbtree support for message queue insertion, it sped up the case where the queue was very full drastically from the original code. It, however, slowed down the case where the queue was empty (not drastically though). This patch caches the last freed rbtree node struct so we can quickly reuse it when we get a new message. This is the common path for any queue that very frequently goes from 0 to 1 then back to 0 messages in queue. Andrew Morton didn't like that we were doing a GFP_ATOMIC allocation in msg_insert, so this patch attempts to speculatively allocate a new node struct outside of the spin lock when we know we need it, but will still fall back to a GFP_ATOMIC allocation if it has to. Once I added the caching, the necessary various ret = ; spin_unlock gyrations in mq_timedsend were getting pretty ugly, so this also slightly refactors that function to streamline the flow of the code and the function exit. Finally, while working on getting performance back I made sure that all of the node structs were always fully initialized when they were first used, rendering the use of kzalloc unnecessary and a waste of CPU cycles. The net result of all of this is: 1) We will avoid a GFP_ATOMIC allocation when possible, but fall back on it when necessary. 2) We will speculatively allocate a node struct using GFP_KERNEL if our cache is empty (and save the struct to our cache if it's still empty after we have obtained the spin lock). 3) The performance of the common queue empty case has significantly improved and is now much more in line with the older performance for this case. The performance changes are: Old mqueue new mqueue new mqueue + caching queue empty send/recv 305/288ns 349/318ns 310/322ns I don't think we'll ever be able to get the recv performance back, but that's because the old recv performance was a direct result and consequence of the old methods abysmal send performance. The recv path simply must do more so that the send path does not incur such a penalty under higher queue depths. As it turns out, the new caching code also sped up the various queue full cases relative to my last patch. That could be because of the difference between the syscall path in 3.3.4-rc5 and 3.3.4-rc6, or because of the change in code flow in the mq_timedsend routine. Regardless, I'll take it. It wasn't huge, and I *would* say it was within the margin for error, but after many repeated runs what I'm seeing is that the old numbers trend slightly higher (about 10 to 20ns depending on which test is the one running). [akpm@linux-foundation.org: checkpatch fixes] Signed-off-by: Doug Ledford <dledford@redhat.com> Cc: Frederic Weisbecker <fweisbec@gmail.com> Cc: Manfred Spraul <manfred@colorfullife.com> Cc: Stephen Rothwell <sfr@canb.auug.org.au> Cc: KOSAKI Motohiro <kosaki.motohiro@jp.fujitsu.com> Signed-off-by: Andrew Morton <akpm@linux-foundation.org> Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
1 parent 7820b07 commit ce2d52c

File tree

1 file changed

+81
-23
lines changed

1 file changed

+81
-23
lines changed

ipc/mqueue.c

Lines changed: 81 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ struct mqueue_inode_info {
6969
wait_queue_head_t wait_q;
7070

7171
struct rb_root msg_tree;
72+
struct posix_msg_tree_node *node_cache;
7273
struct mq_attr attr;
7374

7475
struct sigevent notify;
@@ -134,15 +135,20 @@ static int msg_insert(struct msg_msg *msg, struct mqueue_inode_info *info)
134135
else
135136
p = &(*p)->rb_right;
136137
}
137-
leaf = kzalloc(sizeof(*leaf), GFP_ATOMIC);
138-
if (!leaf)
139-
return -ENOMEM;
140-
rb_init_node(&leaf->rb_node);
141-
INIT_LIST_HEAD(&leaf->msg_list);
138+
if (info->node_cache) {
139+
leaf = info->node_cache;
140+
info->node_cache = NULL;
141+
} else {
142+
leaf = kmalloc(sizeof(*leaf), GFP_ATOMIC);
143+
if (!leaf)
144+
return -ENOMEM;
145+
rb_init_node(&leaf->rb_node);
146+
INIT_LIST_HEAD(&leaf->msg_list);
147+
info->qsize += sizeof(*leaf);
148+
}
142149
leaf->priority = msg->m_type;
143150
rb_link_node(&leaf->rb_node, parent, p);
144151
rb_insert_color(&leaf->rb_node, &info->msg_tree);
145-
info->qsize += sizeof(struct posix_msg_tree_node);
146152
insert_msg:
147153
info->attr.mq_curmsgs++;
148154
info->qsize += msg->m_ts;
@@ -177,22 +183,30 @@ static inline struct msg_msg *msg_get(struct mqueue_inode_info *info)
177183
return NULL;
178184
}
179185
leaf = rb_entry(parent, struct posix_msg_tree_node, rb_node);
180-
if (list_empty(&leaf->msg_list)) {
186+
if (unlikely(list_empty(&leaf->msg_list))) {
181187
pr_warn_once("Inconsistency in POSIX message queue, "
182188
"empty leaf node but we haven't implemented "
183189
"lazy leaf delete!\n");
184190
rb_erase(&leaf->rb_node, &info->msg_tree);
185-
info->qsize -= sizeof(struct posix_msg_tree_node);
186-
kfree(leaf);
191+
if (info->node_cache) {
192+
info->qsize -= sizeof(*leaf);
193+
kfree(leaf);
194+
} else {
195+
info->node_cache = leaf;
196+
}
187197
goto try_again;
188198
} else {
189199
msg = list_first_entry(&leaf->msg_list,
190200
struct msg_msg, m_list);
191201
list_del(&msg->m_list);
192202
if (list_empty(&leaf->msg_list)) {
193203
rb_erase(&leaf->rb_node, &info->msg_tree);
194-
info->qsize -= sizeof(struct posix_msg_tree_node);
195-
kfree(leaf);
204+
if (info->node_cache) {
205+
info->qsize -= sizeof(*leaf);
206+
kfree(leaf);
207+
} else {
208+
info->node_cache = leaf;
209+
}
196210
}
197211
}
198212
info->attr.mq_curmsgs--;
@@ -235,6 +249,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb,
235249
info->qsize = 0;
236250
info->user = NULL; /* set when all is ok */
237251
info->msg_tree = RB_ROOT;
252+
info->node_cache = NULL;
238253
memset(&info->attr, 0, sizeof(info->attr));
239254
info->attr.mq_maxmsg = min(ipc_ns->mq_msg_max,
240255
ipc_ns->mq_msg_default);
@@ -367,6 +382,7 @@ static void mqueue_evict_inode(struct inode *inode)
367382
spin_lock(&info->lock);
368383
while ((msg = msg_get(info)) != NULL)
369384
free_msg(msg);
385+
kfree(info->node_cache);
370386
spin_unlock(&info->lock);
371387

372388
/* Total amount of bytes accounted for the mqueue */
@@ -964,7 +980,8 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
964980
struct mqueue_inode_info *info;
965981
ktime_t expires, *timeout = NULL;
966982
struct timespec ts;
967-
int ret;
983+
struct posix_msg_tree_node *new_leaf = NULL;
984+
int ret = 0;
968985

969986
if (u_abs_timeout) {
970987
int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1012,39 +1029,60 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
10121029
msg_ptr->m_ts = msg_len;
10131030
msg_ptr->m_type = msg_prio;
10141031

1032+
/*
1033+
* msg_insert really wants us to have a valid, spare node struct so
1034+
* it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
1035+
* fall back to that if necessary.
1036+
*/
1037+
if (!info->node_cache)
1038+
new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);
1039+
10151040
spin_lock(&info->lock);
10161041

1042+
if (!info->node_cache && new_leaf) {
1043+
/* Save our speculative allocation into the cache */
1044+
rb_init_node(&new_leaf->rb_node);
1045+
INIT_LIST_HEAD(&new_leaf->msg_list);
1046+
info->node_cache = new_leaf;
1047+
info->qsize += sizeof(*new_leaf);
1048+
new_leaf = NULL;
1049+
} else {
1050+
kfree(new_leaf);
1051+
}
1052+
10171053
if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) {
10181054
if (filp->f_flags & O_NONBLOCK) {
1019-
spin_unlock(&info->lock);
10201055
ret = -EAGAIN;
10211056
} else {
10221057
wait.task = current;
10231058
wait.msg = (void *) msg_ptr;
10241059
wait.state = STATE_NONE;
10251060
ret = wq_sleep(info, SEND, timeout, &wait);
1061+
/*
1062+
* wq_sleep must be called with info->lock held, and
1063+
* returns with the lock released
1064+
*/
1065+
goto out_free;
10261066
}
1027-
if (ret < 0)
1028-
free_msg(msg_ptr);
10291067
} else {
10301068
receiver = wq_get_first_waiter(info, RECV);
10311069
if (receiver) {
10321070
pipelined_send(info, msg_ptr, receiver);
10331071
} else {
10341072
/* adds message to the queue */
1035-
if (msg_insert(msg_ptr, info)) {
1036-
free_msg(msg_ptr);
1037-
ret = -ENOMEM;
1038-
spin_unlock(&info->lock);
1039-
goto out_fput;
1040-
}
1073+
ret = msg_insert(msg_ptr, info);
1074+
if (ret)
1075+
goto out_unlock;
10411076
__do_notify(info);
10421077
}
10431078
inode->i_atime = inode->i_mtime = inode->i_ctime =
10441079
CURRENT_TIME;
1045-
spin_unlock(&info->lock);
1046-
ret = 0;
10471080
}
1081+
out_unlock:
1082+
spin_unlock(&info->lock);
1083+
out_free:
1084+
if (ret)
1085+
free_msg(msg_ptr);
10481086
out_fput:
10491087
fput(filp);
10501088
out:
@@ -1063,6 +1101,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
10631101
struct ext_wait_queue wait;
10641102
ktime_t expires, *timeout = NULL;
10651103
struct timespec ts;
1104+
struct posix_msg_tree_node *new_leaf = NULL;
10661105

10671106
if (u_abs_timeout) {
10681107
int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1098,7 +1137,26 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
10981137
goto out_fput;
10991138
}
11001139

1140+
/*
1141+
* msg_insert really wants us to have a valid, spare node struct so
1142+
* it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
1143+
* fall back to that if necessary.
1144+
*/
1145+
if (!info->node_cache)
1146+
new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);
1147+
11011148
spin_lock(&info->lock);
1149+
1150+
if (!info->node_cache && new_leaf) {
1151+
/* Save our speculative allocation into the cache */
1152+
rb_init_node(&new_leaf->rb_node);
1153+
INIT_LIST_HEAD(&new_leaf->msg_list);
1154+
info->node_cache = new_leaf;
1155+
info->qsize += sizeof(*new_leaf);
1156+
} else {
1157+
kfree(new_leaf);
1158+
}
1159+
11021160
if (info->attr.mq_curmsgs == 0) {
11031161
if (filp->f_flags & O_NONBLOCK) {
11041162
spin_unlock(&info->lock);

0 commit comments

Comments
 (0)