@@ -62,17 +62,17 @@ enum rpcrdma_chunktype {
6262};
6363
6464static const char transfertypes [][12 ] = {
65- "pure inline" , /* no chunks */
66- " read chunk " , /* some argument via rdma read */
67- "*read chunk " , /* entire request via rdma read */
68- "write chunk " , /* some result via rdma write */
65+ "inline" , /* no chunks */
66+ "read list " , /* some argument via rdma read */
67+ "*read list " , /* entire request via rdma read */
68+ "write list " , /* some result via rdma write */
6969 "reply chunk" /* entire reply via rdma write */
7070};
7171
7272/* Returns size of largest RPC-over-RDMA header in a Call message
7373 *
74- * The client marshals only one chunk list per Call message.
75- * The largest list is the Read list .
74+ * The largest Call header contains a full-size Read list and a
75+ * minimal Reply chunk .
7676 */
7777static unsigned int rpcrdma_max_call_header_size (unsigned int maxsegs )
7878{
@@ -85,6 +85,11 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
8585 maxsegs += 2 ; /* segment for head and tail buffers */
8686 size = maxsegs * sizeof (struct rpcrdma_read_chunk );
8787
88+ /* Minimal Read chunk size */
89+ size += sizeof (__be32 ); /* segment count */
90+ size += sizeof (struct rpcrdma_segment );
91+ size += sizeof (__be32 ); /* list discriminator */
92+
8893 dprintk ("RPC: %s: max call header size = %u\n" ,
8994 __func__ , size );
9095 return size ;
@@ -431,6 +436,209 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
431436 return n ;
432437}
433438
439+ static inline __be32 *
440+ xdr_encode_rdma_segment (__be32 * iptr , struct rpcrdma_mr_seg * seg )
441+ {
442+ * iptr ++ = cpu_to_be32 (seg -> mr_rkey );
443+ * iptr ++ = cpu_to_be32 (seg -> mr_len );
444+ return xdr_encode_hyper (iptr , seg -> mr_base );
445+ }
446+
447+ /* XDR-encode the Read list. Supports encoding a list of read
448+ * segments that belong to a single read chunk.
449+ *
450+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
451+ *
452+ * Read chunklist (a linked list):
453+ * N elements, position P (same P for all chunks of same arg!):
454+ * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
455+ *
456+ * Returns a pointer to the XDR word in the RDMA header following
457+ * the end of the Read list, or an error pointer.
458+ */
459+ static __be32 *
460+ rpcrdma_encode_read_list (struct rpcrdma_xprt * r_xprt ,
461+ struct rpcrdma_req * req , struct rpc_rqst * rqst ,
462+ __be32 * iptr , enum rpcrdma_chunktype rtype )
463+ {
464+ struct rpcrdma_mr_seg * seg = req -> rl_nextseg ;
465+ unsigned int pos ;
466+ int n , nsegs ;
467+
468+ if (rtype == rpcrdma_noch ) {
469+ * iptr ++ = xdr_zero ; /* item not present */
470+ return iptr ;
471+ }
472+
473+ pos = rqst -> rq_snd_buf .head [0 ].iov_len ;
474+ if (rtype == rpcrdma_areadch )
475+ pos = 0 ;
476+ nsegs = rpcrdma_convert_iovs (& rqst -> rq_snd_buf , pos , rtype , seg ,
477+ RPCRDMA_MAX_SEGS - req -> rl_nchunks );
478+ if (nsegs < 0 )
479+ return ERR_PTR (nsegs );
480+
481+ do {
482+ n = r_xprt -> rx_ia .ri_ops -> ro_map (r_xprt , seg , nsegs , false);
483+ if (n <= 0 )
484+ return ERR_PTR (n );
485+
486+ * iptr ++ = xdr_one ; /* item present */
487+
488+ /* All read segments in this chunk
489+ * have the same "position".
490+ */
491+ * iptr ++ = cpu_to_be32 (pos );
492+ iptr = xdr_encode_rdma_segment (iptr , seg );
493+
494+ dprintk ("RPC: %5u %s: read segment pos %u "
495+ "%d@0x%016llx:0x%08x (%s)\n" ,
496+ rqst -> rq_task -> tk_pid , __func__ , pos ,
497+ seg -> mr_len , (unsigned long long )seg -> mr_base ,
498+ seg -> mr_rkey , n < nsegs ? "more" : "last" );
499+
500+ r_xprt -> rx_stats .read_chunk_count ++ ;
501+ req -> rl_nchunks ++ ;
502+ seg += n ;
503+ nsegs -= n ;
504+ } while (nsegs );
505+ req -> rl_nextseg = seg ;
506+
507+ /* Finish Read list */
508+ * iptr ++ = xdr_zero ; /* Next item not present */
509+ return iptr ;
510+ }
511+
512+ /* XDR-encode the Write list. Supports encoding a list containing
513+ * one array of plain segments that belong to a single write chunk.
514+ *
515+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
516+ *
517+ * Write chunklist (a list of (one) counted array):
518+ * N elements:
519+ * 1 - N - HLOO - HLOO - ... - HLOO - 0
520+ *
521+ * Returns a pointer to the XDR word in the RDMA header following
522+ * the end of the Write list, or an error pointer.
523+ */
524+ static __be32 *
525+ rpcrdma_encode_write_list (struct rpcrdma_xprt * r_xprt , struct rpcrdma_req * req ,
526+ struct rpc_rqst * rqst , __be32 * iptr ,
527+ enum rpcrdma_chunktype wtype )
528+ {
529+ struct rpcrdma_mr_seg * seg = req -> rl_nextseg ;
530+ int n , nsegs , nchunks ;
531+ __be32 * segcount ;
532+
533+ if (wtype != rpcrdma_writech ) {
534+ * iptr ++ = xdr_zero ; /* no Write list present */
535+ return iptr ;
536+ }
537+
538+ nsegs = rpcrdma_convert_iovs (& rqst -> rq_rcv_buf ,
539+ rqst -> rq_rcv_buf .head [0 ].iov_len ,
540+ wtype , seg ,
541+ RPCRDMA_MAX_SEGS - req -> rl_nchunks );
542+ if (nsegs < 0 )
543+ return ERR_PTR (nsegs );
544+
545+ * iptr ++ = xdr_one ; /* Write list present */
546+ segcount = iptr ++ ; /* save location of segment count */
547+
548+ nchunks = 0 ;
549+ do {
550+ n = r_xprt -> rx_ia .ri_ops -> ro_map (r_xprt , seg , nsegs , true);
551+ if (n <= 0 )
552+ return ERR_PTR (n );
553+
554+ iptr = xdr_encode_rdma_segment (iptr , seg );
555+
556+ dprintk ("RPC: %5u %s: write segment "
557+ "%d@0x016%llx:0x%08x (%s)\n" ,
558+ rqst -> rq_task -> tk_pid , __func__ ,
559+ seg -> mr_len , (unsigned long long )seg -> mr_base ,
560+ seg -> mr_rkey , n < nsegs ? "more" : "last" );
561+
562+ r_xprt -> rx_stats .write_chunk_count ++ ;
563+ r_xprt -> rx_stats .total_rdma_request += seg -> mr_len ;
564+ req -> rl_nchunks ++ ;
565+ nchunks ++ ;
566+ seg += n ;
567+ nsegs -= n ;
568+ } while (nsegs );
569+ req -> rl_nextseg = seg ;
570+
571+ /* Update count of segments in this Write chunk */
572+ * segcount = cpu_to_be32 (nchunks );
573+
574+ /* Finish Write list */
575+ * iptr ++ = xdr_zero ; /* Next item not present */
576+ return iptr ;
577+ }
578+
579+ /* XDR-encode the Reply chunk. Supports encoding an array of plain
580+ * segments that belong to a single write (reply) chunk.
581+ *
582+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
583+ *
584+ * Reply chunk (a counted array):
585+ * N elements:
586+ * 1 - N - HLOO - HLOO - ... - HLOO
587+ *
588+ * Returns a pointer to the XDR word in the RDMA header following
589+ * the end of the Reply chunk, or an error pointer.
590+ */
591+ static __be32 *
592+ rpcrdma_encode_reply_chunk (struct rpcrdma_xprt * r_xprt ,
593+ struct rpcrdma_req * req , struct rpc_rqst * rqst ,
594+ __be32 * iptr , enum rpcrdma_chunktype wtype )
595+ {
596+ struct rpcrdma_mr_seg * seg = req -> rl_nextseg ;
597+ int n , nsegs , nchunks ;
598+ __be32 * segcount ;
599+
600+ if (wtype != rpcrdma_replych ) {
601+ * iptr ++ = xdr_zero ; /* no Reply chunk present */
602+ return iptr ;
603+ }
604+
605+ nsegs = rpcrdma_convert_iovs (& rqst -> rq_rcv_buf , 0 , wtype , seg ,
606+ RPCRDMA_MAX_SEGS - req -> rl_nchunks );
607+ if (nsegs < 0 )
608+ return ERR_PTR (nsegs );
609+
610+ * iptr ++ = xdr_one ; /* Reply chunk present */
611+ segcount = iptr ++ ; /* save location of segment count */
612+
613+ nchunks = 0 ;
614+ do {
615+ n = r_xprt -> rx_ia .ri_ops -> ro_map (r_xprt , seg , nsegs , true);
616+ if (n <= 0 )
617+ return ERR_PTR (n );
618+
619+ iptr = xdr_encode_rdma_segment (iptr , seg );
620+
621+ dprintk ("RPC: %5u %s: reply segment "
622+ "%d@0x%016llx:0x%08x (%s)\n" ,
623+ rqst -> rq_task -> tk_pid , __func__ ,
624+ seg -> mr_len , (unsigned long long )seg -> mr_base ,
625+ seg -> mr_rkey , n < nsegs ? "more" : "last" );
626+
627+ r_xprt -> rx_stats .reply_chunk_count ++ ;
628+ r_xprt -> rx_stats .total_rdma_request += seg -> mr_len ;
629+ req -> rl_nchunks ++ ;
630+ nchunks ++ ;
631+ seg += n ;
632+ nsegs -= n ;
633+ } while (nsegs );
634+ req -> rl_nextseg = seg ;
635+
636+ /* Update count of segments in the Reply chunk */
637+ * segcount = cpu_to_be32 (nchunks );
638+
639+ return iptr ;
640+ }
641+
434642/*
435643 * Copy write data inline.
436644 * This function is used for "small" requests. Data which is passed
@@ -508,24 +716,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
508716 struct rpc_xprt * xprt = rqst -> rq_xprt ;
509717 struct rpcrdma_xprt * r_xprt = rpcx_to_rdmax (xprt );
510718 struct rpcrdma_req * req = rpcr_to_rdmar (rqst );
511- char * base ;
512- size_t rpclen ;
513- ssize_t hdrlen ;
514719 enum rpcrdma_chunktype rtype , wtype ;
515720 struct rpcrdma_msg * headerp ;
721+ unsigned int pos ;
722+ ssize_t hdrlen ;
723+ size_t rpclen ;
724+ __be32 * iptr ;
516725
517726#if defined(CONFIG_SUNRPC_BACKCHANNEL )
518727 if (test_bit (RPC_BC_PA_IN_USE , & rqst -> rq_bc_pa_state ))
519728 return rpcrdma_bc_marshal_reply (rqst );
520729#endif
521730
522- /*
523- * rpclen gets amount of data in first buffer, which is the
524- * pre-registered buffer.
525- */
526- base = rqst -> rq_svec [0 ].iov_base ;
527- rpclen = rqst -> rq_svec [0 ].iov_len ;
528-
529731 headerp = rdmab_to_msg (req -> rl_rdmabuf );
530732 /* don't byte-swap XID, it's already done in request */
531733 headerp -> rm_xid = rqst -> rq_xid ;
@@ -565,61 +767,62 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
565767 */
566768 if (rpcrdma_args_inline (r_xprt , rqst )) {
567769 rtype = rpcrdma_noch ;
770+ rpcrdma_inline_pullup (rqst );
771+ rpclen = rqst -> rq_svec [0 ].iov_len ;
568772 } else if (rqst -> rq_snd_buf .flags & XDRBUF_WRITE ) {
569773 rtype = rpcrdma_readch ;
774+ rpclen = rqst -> rq_svec [0 ].iov_len ;
775+ rpclen += rpcrdma_tail_pullup (& rqst -> rq_snd_buf );
570776 } else {
571777 r_xprt -> rx_stats .nomsg_call_count ++ ;
572778 headerp -> rm_type = htonl (RDMA_NOMSG );
573779 rtype = rpcrdma_areadch ;
574780 rpclen = 0 ;
575781 }
576782
577- /* The following simplification is not true forever */
578- if (rtype != rpcrdma_noch && wtype == rpcrdma_replych )
579- wtype = rpcrdma_noch ;
580- if (rtype != rpcrdma_noch && wtype != rpcrdma_noch ) {
581- dprintk ("RPC: %s: cannot marshal multiple chunk lists\n" ,
582- __func__ );
583- return - EIO ;
584- }
585-
586- hdrlen = RPCRDMA_HDRLEN_MIN ;
587-
588- /*
589- * Pull up any extra send data into the preregistered buffer.
590- * When padding is in use and applies to the transfer, insert
591- * it and change the message type.
783+ /* This implementation supports the following combinations
784+ * of chunk lists in one RPC-over-RDMA Call message:
785+ *
786+ * - Read list
787+ * - Write list
788+ * - Reply chunk
789+ * - Read list + Reply chunk
790+ *
791+ * It might not yet support the following combinations:
792+ *
793+ * - Read list + Write list
794+ *
795+ * It does not support the following combinations:
796+ *
797+ * - Write list + Reply chunk
798+ * - Read list + Write list + Reply chunk
799+ *
800+ * This implementation supports only a single chunk in each
801+ * Read or Write list. Thus for example the client cannot
802+ * send a Call message with a Position Zero Read chunk and a
803+ * regular Read chunk at the same time.
592804 */
593- if (rtype == rpcrdma_noch ) {
594-
595- rpcrdma_inline_pullup (rqst );
596-
597- headerp -> rm_body .rm_nochunks .rm_empty [0 ] = xdr_zero ;
598- headerp -> rm_body .rm_nochunks .rm_empty [1 ] = xdr_zero ;
599- headerp -> rm_body .rm_nochunks .rm_empty [2 ] = xdr_zero ;
600- /* new length after pullup */
601- rpclen = rqst -> rq_svec [0 ].iov_len ;
602- } else if (rtype == rpcrdma_readch )
603- rpclen += rpcrdma_tail_pullup (& rqst -> rq_snd_buf );
604- if (rtype != rpcrdma_noch ) {
605- hdrlen = rpcrdma_create_chunks (rqst , & rqst -> rq_snd_buf ,
606- headerp , rtype );
607- wtype = rtype ; /* simplify dprintk */
608-
609- } else if (wtype != rpcrdma_noch ) {
610- hdrlen = rpcrdma_create_chunks (rqst , & rqst -> rq_rcv_buf ,
611- headerp , wtype );
612- }
613- if (hdrlen < 0 )
614- return hdrlen ;
805+ req -> rl_nchunks = 0 ;
806+ req -> rl_nextseg = req -> rl_segments ;
807+ iptr = headerp -> rm_body .rm_chunks ;
808+ iptr = rpcrdma_encode_read_list (r_xprt , req , rqst , iptr , rtype );
809+ if (IS_ERR (iptr ))
810+ goto out_unmap ;
811+ iptr = rpcrdma_encode_write_list (r_xprt , req , rqst , iptr , wtype );
812+ if (IS_ERR (iptr ))
813+ goto out_unmap ;
814+ iptr = rpcrdma_encode_reply_chunk (r_xprt , req , rqst , iptr , wtype );
815+ if (IS_ERR (iptr ))
816+ goto out_unmap ;
817+ hdrlen = (unsigned char * )iptr - (unsigned char * )headerp ;
615818
616819 if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD (rqst ))
617820 goto out_overflow ;
618821
619- dprintk ("RPC: %s: %s: hdrlen %zd rpclen %zd"
620- " headerp 0x%p base 0x%p lkey 0x%x\n" ,
621- __func__ , transfertypes [wtype ], hdrlen , rpclen ,
622- headerp , base , rdmab_lkey ( req -> rl_rdmabuf ) );
822+ dprintk ("RPC: %5u %s: %s/%s : hdrlen %zd rpclen %zd\n" ,
823+ rqst -> rq_task -> tk_pid , __func__ ,
824+ transfertypes [ rtype ] , transfertypes [wtype ],
825+ hdrlen , rpclen );
623826
624827 req -> rl_send_iov [0 ].addr = rdmab_addr (req -> rl_rdmabuf );
625828 req -> rl_send_iov [0 ].length = hdrlen ;
@@ -637,12 +840,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
637840 return 0 ;
638841
639842out_overflow :
640- pr_err ("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s\n" ,
641- hdrlen , rpclen , transfertypes [wtype ]);
843+ pr_err ("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s \n" ,
844+ hdrlen , rpclen , transfertypes [rtype ], transfertypes [ wtype ]);
642845 /* Terminate this RPC. Chunks registered above will be
643846 * released by xprt_release -> xprt_rmda_free .
644847 */
645848 return - EIO ;
849+
850+ out_unmap :
851+ for (pos = 0 ; req -> rl_nchunks -- ;)
852+ pos += r_xprt -> rx_ia .ri_ops -> ro_unmap (r_xprt ,
853+ & req -> rl_segments [pos ]);
854+ return PTR_ERR (iptr );
646855}
647856
648857/*
0 commit comments