Skip to content

Commit

Permalink
tipc: resolve race problem at unicast message reception
Browse files Browse the repository at this point in the history
TIPC handles message cardinality and sequencing at the link layer,
before passing messages upwards to the destination sockets. During the
upcall from link to socket no locks are held. It is therefore possible,
and we see it happen occasionally, that messages arriving in different
threads and delivered in sequence still bypass each other before they
reach the destination socket. This must not happen, since it violates
the sequentiality guarantee.

We solve this by adding a new input buffer queue to the link structure.
Arriving messages are added safely to the tail of that queue by the
link, while the head of the queue is consumed, also safely, by the
receiving socket. Sequentiality is secured per socket by only allowing
buffers to be dequeued inside the socket lock. Since there may be multiple
simultaneous readers of the queue, we use a 'filter' parameter to reduce
the risk that they peek the same buffer from the queue, hence also
reducing the risk of contention on the receiving socket locks.

This solves the sequentiality problem, and seems to cause no measurable
performance degradation.

A nice side effect of this change is that lock handling in the functions
tipc_rcv() and tipc_bcast_rcv() now becomes uniform, something that
will enable future simplifications of those functions.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
Jon Paul Maloy authored and davem330 committed Feb 6, 2015
1 parent 94153e3 commit c637c10
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 241 deletions.
20 changes: 11 additions & 9 deletions net/tipc/bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,8 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
void tipc_bclink_wakeup_users(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct sk_buff *skb;

while ((skb = skb_dequeue(&tn->bclink->link.waiting_sks)))
tipc_sk_rcv(net, skb);
tipc_sk_rcv(net, &tn->bclink->link.wakeupq);
}

/**
Expand Down Expand Up @@ -271,9 +269,8 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
tipc_link_push_packets(tn->bcl);
bclink_set_last_sent(net);
}
if (unlikely(released && !skb_queue_empty(&tn->bcl->waiting_sks)))
if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;

exit:
tipc_bclink_unlock(net);
}
Expand Down Expand Up @@ -450,6 +447,9 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
u32 next_in;
u32 seqno;
int deferred = 0;
int pos = 0;
struct sk_buff *iskb;
struct sk_buff_head msgs;

/* Screen out unwanted broadcast messages */
if (msg_mc_netid(msg) != tn->net_id)
Expand Down Expand Up @@ -506,7 +506,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
bcl->stats.recv_bundled += msg_msgcnt(msg);
tipc_bclink_unlock(net);
tipc_node_unlock(node);
tipc_link_bundle_rcv(net, buf);
while (tipc_msg_extract(buf, &iskb, &pos))
tipc_sk_mcast_rcv(net, iskb);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
tipc_buf_append(&node->bclink.reasm_buf, &buf);
if (unlikely(!buf && !node->bclink.reasm_buf))
Expand All @@ -527,7 +528,9 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
bclink_accept_pkt(node, seqno);
tipc_bclink_unlock(net);
tipc_node_unlock(node);
tipc_named_rcv(net, buf);
skb_queue_head_init(&msgs);
skb_queue_tail(&msgs, buf);
tipc_named_rcv(net, &msgs);
} else {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
Expand Down Expand Up @@ -944,10 +947,9 @@ int tipc_bclink_init(struct net *net)
spin_lock_init(&bclink->lock);
__skb_queue_head_init(&bcl->outqueue);
__skb_queue_head_init(&bcl->deferred_queue);
skb_queue_head_init(&bcl->waiting_sks);
skb_queue_head_init(&bcl->wakeupq);
bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock);
__skb_queue_head_init(&bclink->node.waiting_sks);
bcl->owner = &bclink->node;
bcl->owner->net = net;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
Expand Down
Loading

0 comments on commit c637c10

Please sign in to comment.