Skip to content

Commit cad2929

Browse files
Hoang Huu Ledavem330
authored andcommitted
tipc: update a binding service via broadcast
Currently, updating binding table (add service binding to name table/withdraw a service binding) is being sent over replicast. However, if we are scaling up clusters to > 100 nodes/containers this method is less affection because of looping through nodes in a cluster one by one. It is worth to use broadcast to update a binding service. This way, the binding table can be updated on all peer nodes in one shot. Broadcast is used when all peer nodes, as indicated by a new capability flag TIPC_NAMED_BCAST, support reception of this message type. Four problems need to be considered when introducing this feature. 1) When establishing a link to a new peer node we still update this by a unicast 'bulk' update. This may lead to race conditions, where a later broadcast publication/withdrawal bypass the 'bulk', resulting in disordered publications, or even that a withdrawal may arrive before the corresponding publication. We solve this by adding an 'is_last_bulk' bit in the last bulk messages so that it can be distinguished from all other messages. Only when this message has arrived do we open up for reception of broadcast publications/withdrawals. 2) When a first legacy node is added to the cluster all distribution will switch over to use the legacy 'replicast' method, while the opposite happens when the last legacy node leaves the cluster. This entails another risk of message disordering that has to be handled. We solve this by adding a sequence number to the broadcast/replicast messages, so that disordering can be discovered and corrected. Note however that we don't need to consider potential message loss or duplication at this protocol level. 3) Bulk messages don't contain any sequence numbers, and will always arrive in order. Hence we must exempt those from the sequence number control and deliver them unconditionally. We solve this by adding a new 'is_bulk' bit in those messages so that they can be recognized. 4) Legacy messages, which don't contain any new bits or sequence numbers, but neither can arrive out of order, also need to be exempt from the initial synchronization and sequence number check, and delivered unconditionally. Therefore, we add another 'is_not_legacy' bit to all new messages so that those can be distinguished from legacy messages and the latter delivered directly. v1->v2: - fix warning issue reported by kbuild test robot <lkp@intel.com> - add santiy check to drop the publication message with a sequence number that is lower than the agreed synch point Signed-off-by: kernel test robot <lkp@intel.com> Signed-off-by: Hoang Huu Le <hoang.h.le@dektech.com.au> Acked-by: Jon Maloy <jmaloy@redhat.com> Signed-off-by: David S. Miller <davem@davemloft.net>
1 parent 6911967 commit cad2929

File tree

10 files changed

+177
-48
lines changed

10 files changed

+177
-48
lines changed

net/tipc/bcast.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,8 @@ static void tipc_bcast_select_xmit_method(struct net *net, int dests,
250250
* Consumes the buffer chain.
251251
* Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
252252
*/
253-
static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
254-
u16 *cong_link_cnt)
253+
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
254+
u16 *cong_link_cnt)
255255
{
256256
struct tipc_link *l = tipc_bc_sndlink(net);
257257
struct sk_buff_head xmitq;
@@ -752,7 +752,7 @@ void tipc_nlist_purge(struct tipc_nlist *nl)
752752
nl->local = false;
753753
}
754754

755-
u32 tipc_bcast_get_broadcast_mode(struct net *net)
755+
u32 tipc_bcast_get_mode(struct net *net)
756756
{
757757
struct tipc_bc_base *bb = tipc_bc_base(net);
758758

net/tipc/bcast.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ void tipc_bcast_toggle_rcast(struct net *net, bool supp);
9090
int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
9191
struct tipc_mc_method *method, struct tipc_nlist *dests,
9292
u16 *cong_link_cnt);
93+
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
94+
u16 *cong_link_cnt);
9395
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
9496
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
9597
struct tipc_msg *hdr);
@@ -101,7 +103,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg,
101103
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
102104
int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l);
103105

104-
u32 tipc_bcast_get_broadcast_mode(struct net *net);
106+
u32 tipc_bcast_get_mode(struct net *net);
105107
u32 tipc_bcast_get_broadcast_ratio(struct net *net);
106108

107109
void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head *defq,

net/tipc/link.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2745,7 +2745,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg,
27452745
void *hdr;
27462746
struct nlattr *attrs;
27472747
struct nlattr *prop;
2748-
u32 bc_mode = tipc_bcast_get_broadcast_mode(net);
2748+
u32 bc_mode = tipc_bcast_get_mode(net);
27492749
u32 bc_ratio = tipc_bcast_get_broadcast_ratio(net);
27502750

27512751
if (!bcl)

net/tipc/msg.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,36 @@ static inline void msg_set_errcode(struct tipc_msg *m, u32 err)
438438
msg_set_bits(m, 1, 25, 0xf, err);
439439
}
440440

441+
static inline void msg_set_bulk(struct tipc_msg *m)
442+
{
443+
msg_set_bits(m, 1, 28, 0x1, 1);
444+
}
445+
446+
static inline u32 msg_is_bulk(struct tipc_msg *m)
447+
{
448+
return msg_bits(m, 1, 28, 0x1);
449+
}
450+
451+
static inline void msg_set_last_bulk(struct tipc_msg *m)
452+
{
453+
msg_set_bits(m, 1, 27, 0x1, 1);
454+
}
455+
456+
static inline u32 msg_is_last_bulk(struct tipc_msg *m)
457+
{
458+
return msg_bits(m, 1, 27, 0x1);
459+
}
460+
461+
static inline void msg_set_non_legacy(struct tipc_msg *m)
462+
{
463+
msg_set_bits(m, 1, 26, 0x1, 1);
464+
}
465+
466+
static inline u32 msg_is_legacy(struct tipc_msg *m)
467+
{
468+
return !msg_bits(m, 1, 26, 0x1);
469+
}
470+
441471
static inline u32 msg_reroute_cnt(struct tipc_msg *m)
442472
{
443473
return msg_bits(m, 1, 21, 0xf);
@@ -567,6 +597,16 @@ static inline void msg_set_origport(struct tipc_msg *m, u32 p)
567597
msg_set_word(m, 4, p);
568598
}
569599

600+
static inline u16 msg_named_seqno(struct tipc_msg *m)
601+
{
602+
return msg_bits(m, 4, 0, 0xffff);
603+
}
604+
605+
static inline void msg_set_named_seqno(struct tipc_msg *m, u16 n)
606+
{
607+
msg_set_bits(m, 4, 0, 0xffff, n);
608+
}
609+
570610
static inline u32 msg_destport(struct tipc_msg *m)
571611
{
572612
return msg_word(m, 5);

net/tipc/name_distr.c

Lines changed: 87 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ)
102102
pr_warn("Publication distribution failure\n");
103103
return NULL;
104104
}
105-
105+
msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
106+
msg_set_non_legacy(buf_msg(skb));
106107
item = (struct distr_item *)msg_data(buf_msg(skb));
107108
publ_to_item(item, publ);
108109
return skb;
@@ -114,24 +115,25 @@ struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ)
114115
struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
115116
{
116117
struct name_table *nt = tipc_name_table(net);
117-
struct sk_buff *buf;
118118
struct distr_item *item;
119+
struct sk_buff *skb;
119120

120121
write_lock_bh(&nt->cluster_scope_lock);
121122
list_del(&publ->binding_node);
122123
write_unlock_bh(&nt->cluster_scope_lock);
123124
if (publ->scope == TIPC_NODE_SCOPE)
124125
return NULL;
125126

126-
buf = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
127-
if (!buf) {
127+
skb = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
128+
if (!skb) {
128129
pr_warn("Withdrawal distribution failure\n");
129130
return NULL;
130131
}
131-
132-
item = (struct distr_item *)msg_data(buf_msg(buf));
132+
msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
133+
msg_set_non_legacy(buf_msg(skb));
134+
item = (struct distr_item *)msg_data(buf_msg(skb));
133135
publ_to_item(item, publ);
134-
return buf;
136+
return skb;
135137
}
136138

137139
/**
@@ -141,14 +143,15 @@ struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ)
141143
* @pls: linked list of publication items to be packed into buffer chain
142144
*/
143145
static void named_distribute(struct net *net, struct sk_buff_head *list,
144-
u32 dnode, struct list_head *pls)
146+
u32 dnode, struct list_head *pls, u16 seqno)
145147
{
146148
struct publication *publ;
147149
struct sk_buff *skb = NULL;
148150
struct distr_item *item = NULL;
149151
u32 msg_dsz = ((tipc_node_get_mtu(net, dnode, 0, false) - INT_H_SIZE) /
150152
ITEM_SIZE) * ITEM_SIZE;
151153
u32 msg_rem = msg_dsz;
154+
struct tipc_msg *hdr;
152155

153156
list_for_each_entry(publ, pls, binding_node) {
154157
/* Prepare next buffer: */
@@ -159,8 +162,11 @@ static void named_distribute(struct net *net, struct sk_buff_head *list,
159162
pr_warn("Bulk publication failure\n");
160163
return;
161164
}
162-
msg_set_bc_ack_invalid(buf_msg(skb), true);
163-
item = (struct distr_item *)msg_data(buf_msg(skb));
165+
hdr = buf_msg(skb);
166+
msg_set_bc_ack_invalid(hdr, true);
167+
msg_set_bulk(hdr);
168+
msg_set_non_legacy(hdr);
169+
item = (struct distr_item *)msg_data(hdr);
164170
}
165171

166172
/* Pack publication into message: */
@@ -176,24 +182,35 @@ static void named_distribute(struct net *net, struct sk_buff_head *list,
176182
}
177183
}
178184
if (skb) {
179-
msg_set_size(buf_msg(skb), INT_H_SIZE + (msg_dsz - msg_rem));
185+
hdr = buf_msg(skb);
186+
msg_set_size(hdr, INT_H_SIZE + (msg_dsz - msg_rem));
180187
skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem));
181188
__skb_queue_tail(list, skb);
182189
}
190+
hdr = buf_msg(skb_peek_tail(list));
191+
msg_set_last_bulk(hdr);
192+
msg_set_named_seqno(hdr, seqno);
183193
}
184194

185195
/**
186196
* tipc_named_node_up - tell specified node about all publications by this node
187197
*/
188-
void tipc_named_node_up(struct net *net, u32 dnode)
198+
void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities)
189199
{
190200
struct name_table *nt = tipc_name_table(net);
201+
struct tipc_net *tn = tipc_net(net);
191202
struct sk_buff_head head;
203+
u16 seqno;
192204

193205
__skb_queue_head_init(&head);
206+
spin_lock_bh(&tn->nametbl_lock);
207+
if (!(capabilities & TIPC_NAMED_BCAST))
208+
nt->rc_dests++;
209+
seqno = nt->snd_nxt;
210+
spin_unlock_bh(&tn->nametbl_lock);
194211

195212
read_lock_bh(&nt->cluster_scope_lock);
196-
named_distribute(net, &head, dnode, &nt->cluster_scope);
213+
named_distribute(net, &head, dnode, &nt->cluster_scope, seqno);
197214
tipc_node_xmit(net, &head, dnode, 0);
198215
read_unlock_bh(&nt->cluster_scope_lock);
199216
}
@@ -245,13 +262,21 @@ static void tipc_dist_queue_purge(struct net *net, u32 addr)
245262
spin_unlock_bh(&tn->nametbl_lock);
246263
}
247264

248-
void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr)
265+
void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
266+
u32 addr, u16 capabilities)
249267
{
268+
struct name_table *nt = tipc_name_table(net);
269+
struct tipc_net *tn = tipc_net(net);
270+
250271
struct publication *publ, *tmp;
251272

252273
list_for_each_entry_safe(publ, tmp, nsub_list, binding_node)
253274
tipc_publ_purge(net, publ, addr);
254275
tipc_dist_queue_purge(net, addr);
276+
spin_lock_bh(&tn->nametbl_lock);
277+
if (!(capabilities & TIPC_NAMED_BCAST))
278+
nt->rc_dests--;
279+
spin_unlock_bh(&tn->nametbl_lock);
255280
}
256281

257282
/**
@@ -295,29 +320,62 @@ static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
295320
return false;
296321
}
297322

323+
static struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
324+
u16 *rcv_nxt, bool *open)
325+
{
326+
struct sk_buff *skb, *tmp;
327+
struct tipc_msg *hdr;
328+
u16 seqno;
329+
330+
skb_queue_walk_safe(namedq, skb, tmp) {
331+
skb_linearize(skb);
332+
hdr = buf_msg(skb);
333+
seqno = msg_named_seqno(hdr);
334+
if (msg_is_last_bulk(hdr)) {
335+
*rcv_nxt = seqno;
336+
*open = true;
337+
}
338+
339+
if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
340+
__skb_unlink(skb, namedq);
341+
return skb;
342+
}
343+
344+
if (*open && (*rcv_nxt == seqno)) {
345+
(*rcv_nxt)++;
346+
__skb_unlink(skb, namedq);
347+
return skb;
348+
}
349+
350+
if (less(seqno, *rcv_nxt)) {
351+
__skb_unlink(skb, namedq);
352+
kfree_skb(skb);
353+
continue;
354+
}
355+
}
356+
return NULL;
357+
}
358+
298359
/**
299360
* tipc_named_rcv - process name table update messages sent by another node
300361
*/
301-
void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
362+
void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
363+
u16 *rcv_nxt, bool *open)
302364
{
303-
struct tipc_net *tn = net_generic(net, tipc_net_id);
304-
struct tipc_msg *msg;
365+
struct tipc_net *tn = tipc_net(net);
305366
struct distr_item *item;
306-
uint count;
307-
u32 node;
367+
struct tipc_msg *hdr;
308368
struct sk_buff *skb;
309-
int mtype;
369+
u32 count, node;
310370

311371
spin_lock_bh(&tn->nametbl_lock);
312-
for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
313-
skb_linearize(skb);
314-
msg = buf_msg(skb);
315-
mtype = msg_type(msg);
316-
item = (struct distr_item *)msg_data(msg);
317-
count = msg_data_sz(msg) / ITEM_SIZE;
318-
node = msg_orignode(msg);
372+
while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
373+
hdr = buf_msg(skb);
374+
node = msg_orignode(hdr);
375+
item = (struct distr_item *)msg_data(hdr);
376+
count = msg_data_sz(hdr) / ITEM_SIZE;
319377
while (count--) {
320-
tipc_update_nametbl(net, item, node, mtype);
378+
tipc_update_nametbl(net, item, node, msg_type(hdr));
321379
item++;
322380
}
323381
kfree_skb(skb);
@@ -345,6 +403,6 @@ void tipc_named_reinit(struct net *net)
345403
publ->node = self;
346404
list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node)
347405
publ->node = self;
348-
406+
nt->rc_dests = 0;
349407
spin_unlock_bh(&tn->nametbl_lock);
350408
}

net/tipc/name_distr.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,14 @@ struct distr_item {
6767
__be32 key;
6868
};
6969

70+
void tipc_named_bcast(struct net *net, struct sk_buff *skb);
7071
struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ);
7172
struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ);
72-
void tipc_named_node_up(struct net *net, u32 dnode);
73-
void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
73+
void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities);
74+
void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
75+
u16 *rcv_nxt, bool *open);
7476
void tipc_named_reinit(struct net *net);
75-
void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr);
77+
void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
78+
u32 addr, u16 capabilities);
7679

7780
#endif

net/tipc/name_table.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,7 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
729729
struct tipc_net *tn = tipc_net(net);
730730
struct publication *p = NULL;
731731
struct sk_buff *skb = NULL;
732+
u32 rc_dests;
732733

733734
spin_lock_bh(&tn->nametbl_lock);
734735

@@ -743,12 +744,14 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
743744
nt->local_publ_count++;
744745
skb = tipc_named_publish(net, p);
745746
}
747+
rc_dests = nt->rc_dests;
746748
exit:
747749
spin_unlock_bh(&tn->nametbl_lock);
748750

749751
if (skb)
750-
tipc_node_broadcast(net, skb);
752+
tipc_node_broadcast(net, skb, rc_dests);
751753
return p;
754+
752755
}
753756

754757
/**
@@ -762,6 +765,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
762765
u32 self = tipc_own_addr(net);
763766
struct sk_buff *skb = NULL;
764767
struct publication *p;
768+
u32 rc_dests;
765769

766770
spin_lock_bh(&tn->nametbl_lock);
767771

@@ -775,10 +779,11 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
775779
pr_err("Failed to remove local publication {%u,%u,%u}/%u\n",
776780
type, lower, upper, key);
777781
}
782+
rc_dests = nt->rc_dests;
778783
spin_unlock_bh(&tn->nametbl_lock);
779784

780785
if (skb) {
781-
tipc_node_broadcast(net, skb);
786+
tipc_node_broadcast(net, skb, rc_dests);
782787
return 1;
783788
}
784789
return 0;

net/tipc/name_table.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ struct name_table {
106106
struct list_head cluster_scope;
107107
rwlock_t cluster_scope_lock;
108108
u32 local_publ_count;
109+
u32 rc_dests;
110+
u32 snd_nxt;
109111
};
110112

111113
int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);

0 commit comments

Comments
 (0)