@@ -1167,19 +1167,24 @@ int udp_sendpage(struct sock *sk, struct page *page, int offset,
11671167static void udp_rmem_release (struct sock * sk , int size , int partial )
11681168{
11691169 struct udp_sock * up = udp_sk (sk );
1170+ struct sk_buff_head * sk_queue ;
11701171 int amt ;
11711172
11721173 if (likely (partial )) {
11731174 up -> forward_deficit += size ;
11741175 size = up -> forward_deficit ;
11751176 if (size < (sk -> sk_rcvbuf >> 2 ) &&
1176- !skb_queue_empty (& sk -> sk_receive_queue ))
1177+ !skb_queue_empty (& up -> reader_queue ))
11771178 return ;
11781179 } else {
11791180 size += up -> forward_deficit ;
11801181 }
11811182 up -> forward_deficit = 0 ;
11821183
1184+ /* acquire the sk_receive_queue for fwd allocated memory scheduling */
1185+ sk_queue = & sk -> sk_receive_queue ;
1186+ spin_lock (& sk_queue -> lock );
1187+
11831188 sk -> sk_forward_alloc += size ;
11841189 amt = (sk -> sk_forward_alloc - partial ) & ~(SK_MEM_QUANTUM - 1 );
11851190 sk -> sk_forward_alloc -= amt ;
@@ -1188,9 +1193,14 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
11881193 __sk_mem_reduce_allocated (sk , amt >> SK_MEM_QUANTUM_SHIFT );
11891194
11901195 atomic_sub (size , & sk -> sk_rmem_alloc );
1196+
1197+ /* this can save us from acquiring the rx queue lock on next receive */
1198+ skb_queue_splice_tail_init (sk_queue , & up -> reader_queue );
1199+
1200+ spin_unlock (& sk_queue -> lock );
11911201}
11921202
1193- /* Note: called with sk_receive_queue .lock held.
1203+ /* Note: called with reader_queue .lock held.
11941204 * Instead of using skb->truesize here, find a copy of it in skb->dev_scratch
11951205 * This avoids a cache line miss while receive_queue lock is held.
11961206 * Look at __udp_enqueue_schedule_skb() to find where this copy is done.
@@ -1306,10 +1316,12 @@ EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb);
13061316void udp_destruct_sock (struct sock * sk )
13071317{
13081318 /* reclaim completely the forward allocated memory */
1319+ struct udp_sock * up = udp_sk (sk );
13091320 unsigned int total = 0 ;
13101321 struct sk_buff * skb ;
13111322
1312- while ((skb = __skb_dequeue (& sk -> sk_receive_queue )) != NULL ) {
1323+ skb_queue_splice_tail_init (& sk -> sk_receive_queue , & up -> reader_queue );
1324+ while ((skb = __skb_dequeue (& up -> reader_queue )) != NULL ) {
13131325 total += skb -> truesize ;
13141326 kfree_skb (skb );
13151327 }
@@ -1321,6 +1333,7 @@ EXPORT_SYMBOL_GPL(udp_destruct_sock);
13211333
13221334int udp_init_sock (struct sock * sk )
13231335{
1336+ skb_queue_head_init (& udp_sk (sk )-> reader_queue );
13241337 sk -> sk_destruct = udp_destruct_sock ;
13251338 return 0 ;
13261339}
@@ -1338,6 +1351,26 @@ void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len)
13381351}
13391352EXPORT_SYMBOL_GPL (skb_consume_udp );
13401353
1354+ static struct sk_buff * __first_packet_length (struct sock * sk ,
1355+ struct sk_buff_head * rcvq ,
1356+ int * total )
1357+ {
1358+ struct sk_buff * skb ;
1359+
1360+ while ((skb = skb_peek (rcvq )) != NULL &&
1361+ udp_lib_checksum_complete (skb )) {
1362+ __UDP_INC_STATS (sock_net (sk ), UDP_MIB_CSUMERRORS ,
1363+ IS_UDPLITE (sk ));
1364+ __UDP_INC_STATS (sock_net (sk ), UDP_MIB_INERRORS ,
1365+ IS_UDPLITE (sk ));
1366+ atomic_inc (& sk -> sk_drops );
1367+ __skb_unlink (skb , rcvq );
1368+ * total += skb -> truesize ;
1369+ kfree_skb (skb );
1370+ }
1371+ return skb ;
1372+ }
1373+
13411374/**
13421375 * first_packet_length - return length of first packet in receive queue
13431376 * @sk: socket
@@ -1347,22 +1380,20 @@ EXPORT_SYMBOL_GPL(skb_consume_udp);
13471380 */
13481381static int first_packet_length (struct sock * sk )
13491382{
1350- struct sk_buff_head * rcvq = & sk -> sk_receive_queue ;
1383+ struct sk_buff_head * rcvq = & udp_sk (sk )-> reader_queue ;
1384+ struct sk_buff_head * sk_queue = & sk -> sk_receive_queue ;
13511385 struct sk_buff * skb ;
13521386 int total = 0 ;
13531387 int res ;
13541388
13551389 spin_lock_bh (& rcvq -> lock );
1356- while ((skb = skb_peek (rcvq )) != NULL &&
1357- udp_lib_checksum_complete (skb )) {
1358- __UDP_INC_STATS (sock_net (sk ), UDP_MIB_CSUMERRORS ,
1359- IS_UDPLITE (sk ));
1360- __UDP_INC_STATS (sock_net (sk ), UDP_MIB_INERRORS ,
1361- IS_UDPLITE (sk ));
1362- atomic_inc (& sk -> sk_drops );
1363- __skb_unlink (skb , rcvq );
1364- total += skb -> truesize ;
1365- kfree_skb (skb );
1390+ skb = __first_packet_length (sk , rcvq , & total );
1391+ if (!skb && !skb_queue_empty (sk_queue )) {
1392+ spin_lock (& sk_queue -> lock );
1393+ skb_queue_splice_tail_init (sk_queue , rcvq );
1394+ spin_unlock (& sk_queue -> lock );
1395+
1396+ skb = __first_packet_length (sk , rcvq , & total );
13661397 }
13671398 res = skb ? skb -> len : -1 ;
13681399 if (total )
@@ -1400,6 +1431,79 @@ int udp_ioctl(struct sock *sk, int cmd, unsigned long arg)
14001431}
14011432EXPORT_SYMBOL (udp_ioctl );
14021433
1434+ struct sk_buff * __skb_recv_udp (struct sock * sk , unsigned int flags ,
1435+ int noblock , int * peeked , int * off , int * err )
1436+ {
1437+ struct sk_buff_head * sk_queue = & sk -> sk_receive_queue ;
1438+ struct sk_buff_head * queue ;
1439+ struct sk_buff * last ;
1440+ long timeo ;
1441+ int error ;
1442+
1443+ queue = & udp_sk (sk )-> reader_queue ;
1444+ flags |= noblock ? MSG_DONTWAIT : 0 ;
1445+ timeo = sock_rcvtimeo (sk , flags & MSG_DONTWAIT );
1446+ do {
1447+ struct sk_buff * skb ;
1448+
1449+ error = sock_error (sk );
1450+ if (error )
1451+ break ;
1452+
1453+ error = - EAGAIN ;
1454+ * peeked = 0 ;
1455+ do {
1456+ int _off = * off ;
1457+
1458+ spin_lock_bh (& queue -> lock );
1459+ skb = __skb_try_recv_from_queue (sk , queue , flags ,
1460+ udp_skb_destructor ,
1461+ peeked , & _off , err ,
1462+ & last );
1463+ if (skb ) {
1464+ spin_unlock_bh (& queue -> lock );
1465+ * off = _off ;
1466+ return skb ;
1467+ }
1468+
1469+ if (skb_queue_empty (sk_queue )) {
1470+ spin_unlock_bh (& queue -> lock );
1471+ goto busy_check ;
1472+ }
1473+
1474+ /* refill the reader queue and walk it again */
1475+ _off = * off ;
1476+ spin_lock (& sk_queue -> lock );
1477+ skb_queue_splice_tail_init (sk_queue , queue );
1478+ spin_unlock (& sk_queue -> lock );
1479+
1480+ skb = __skb_try_recv_from_queue (sk , queue , flags ,
1481+ udp_skb_destructor ,
1482+ peeked , & _off , err ,
1483+ & last );
1484+ spin_unlock_bh (& queue -> lock );
1485+ if (skb ) {
1486+ * off = _off ;
1487+ return skb ;
1488+ }
1489+
1490+ busy_check :
1491+ if (!sk_can_busy_loop (sk ))
1492+ break ;
1493+
1494+ sk_busy_loop (sk , flags & MSG_DONTWAIT );
1495+ } while (!skb_queue_empty (sk_queue ));
1496+
1497+ /* sk_queue is empty, reader_queue may contain peeked packets */
1498+ } while (timeo &&
1499+ !__skb_wait_for_more_packets (sk , & error , & timeo ,
1500+ (struct sk_buff * )sk_queue ));
1501+
1502+ * err = error ;
1503+ return NULL ;
1504+ }
1505+ EXPORT_SYMBOL_GPL (__skb_recv_udp );
1506+
14031507/*
14041508 * This should be easy, if there is something there we
14051509 * return it, otherwise we block.
@@ -1490,7 +1594,8 @@ int udp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int noblock,
14901594 return err ;
14911595
14921596csum_copy_err :
1493- if (!__sk_queue_drop_skb (sk , skb , flags , udp_skb_destructor )) {
1597+ if (!__sk_queue_drop_skb (sk , & udp_sk (sk )-> reader_queue , skb , flags ,
1598+ udp_skb_destructor )) {
14941599 UDP_INC_STATS (sock_net (sk ), UDP_MIB_CSUMERRORS , is_udplite );
14951600 UDP_INC_STATS (sock_net (sk ), UDP_MIB_INERRORS , is_udplite );
14961601 }
@@ -2325,6 +2430,9 @@ unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait)
23252430 unsigned int mask = datagram_poll (file , sock , wait );
23262431 struct sock * sk = sock -> sk ;
23272432
2433+ if (!skb_queue_empty (& udp_sk (sk )-> reader_queue ))
2434+ mask |= POLLIN | POLLRDNORM ;
2435+
23282436 sock_rps_record_flow (sk );
23292437
23302438 /* Check for false positives due to checksum errors */
0 commit comments