@@ -21,7 +21,10 @@
/*
* Copyright 2010 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
+ */
+/*
* Copyright 2012 Milan Jurik. All rights reserved.
+ * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
*/
/* Copyright (c) 1990 Mentat Inc. */
@@ -231,9 +234,6 @@ static void mir_wsrv(queue_t *q);
static struct module_info rpcmod_info =
{RPCMOD_ID, " rpcmod" , 0 , INFPSZ, 256 *1024 , 1024 };
-/*
- * Read side has no service procedure.
- */
static struct qinit rpcmodrinit = {
(int (*)())rmm_rput,
(int (*)())rmm_rsrv,
@@ -269,6 +269,9 @@ struct xprt_style_ops {
void (*xo_rsrv)();
};
+/*
+ * Read side has no service procedure.
+ */
static struct xprt_style_ops xprt_clts_ops = {
rpcmodopen,
rpcmodclose,
@@ -291,7 +294,7 @@ static struct xprt_style_ops xprt_cots_ops = {
* Per rpcmod "slot" data structure. q->q_ptr points to one of these.
*/
struct rpcm {
- void *rm_krpc_cell; /* Reserved for use by KRPC */
+ void *rm_krpc_cell; /* Reserved for use by kRPC */
struct xprt_style_ops *rm_ops;
int rm_type; /* Client or server side stream */
#define RM_CLOSING 0x1 /* somebody is trying to close slot */
@@ -312,7 +315,7 @@ struct temp_slot {
};
typedef struct mir_s {
- void *mir_krpc_cell; /* Reserved for KRPC use. This field */
+ void *mir_krpc_cell; /* Reserved for kRPC use. This field */
/* must be first in the structure. */
struct xprt_style_ops *rm_ops;
int mir_type; /* Client or server side stream */
@@ -362,7 +365,7 @@ typedef struct mir_s {
* to 1 whenever a new request is sent out (mir_wput)
* and cleared when the timer fires (mir_timer). If
* the timer fires with this value equal to 0, then the
- * stream is considered idle and KRPC is notified.
+ * stream is considered idle and kRPC is notified.
*/
mir_clntreq : 1 ,
/*
@@ -404,9 +407,9 @@ typedef struct mir_s {
/* that a kernel RPC server thread */
/* (see svc_run()) has on this rpcmod */
/* slot. Effectively, it is the */
- /* number * of unprocessed messages */
+ /* number of unprocessed messages */
/* that have been passed up to the */
- /* KRPC layer */
+ /* kRPC layer */
mblk_t *mir_svc_pend_mp; /* Pending T_ORDREL_IND or */
/* T_DISCON_IND */
@@ -567,7 +570,7 @@ rmm_close(queue_t *q, int flag, cred_t *crp)
return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp));
}
-static void rpcmod_release (queue_t *, mblk_t *);
+static void rpcmod_release (queue_t *, mblk_t *, bool_t );
/*
* rpcmodopen - open routine gets called when the module gets pushed
* onto the stream.
@@ -578,7 +581,7 @@ rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
{
struct rpcm *rmp;
- extern void (*rpc_rele)(queue_t *, mblk_t *);
+ extern void (*rpc_rele)(queue_t *, mblk_t *, bool_t );
TRACE_0 (TR_FAC_KRPC, TR_RPCMODOPEN_START, " rpcmodopen_start:" );
@@ -687,13 +690,6 @@ rpcmodclose(queue_t *q, int flag, cred_t *crp)
return (0 );
}
-#ifdef DEBUG
-int rpcmod_send_msg_up = 0 ;
-int rpcmod_send_uderr = 0 ;
-int rpcmod_send_dup = 0 ;
-int rpcmod_send_dup_cnt = 0 ;
-#endif
-
/*
* rpcmodrput - Module read put procedure. This is called from
* the module, driver, or stream head downstream.
@@ -715,54 +711,6 @@ rpcmodrput(queue_t *q, mblk_t *mp)
return ;
}
-#ifdef DEBUG
- if (rpcmod_send_msg_up > 0 ) {
- mblk_t *nmp = copymsg (mp);
- if (nmp) {
- putnext (q, nmp);
- rpcmod_send_msg_up--;
- }
- }
- if ((rpcmod_send_uderr > 0 ) && mp->b_datap->db_type == M_PROTO) {
- mblk_t *nmp;
- struct T_unitdata_ind *data;
- struct T_uderror_ind *ud;
- int d;
- data = (struct T_unitdata_ind *)mp->b_rptr;
- if (data->PRIM_type == T_UNITDATA_IND) {
- d = sizeof (*ud) - sizeof (*data);
- nmp = allocb (mp->b_wptr - mp->b_rptr + d, BPRI_HI);
- if (nmp) {
- ud = (struct T_uderror_ind *)nmp->b_rptr;
- ud->PRIM_type = T_UDERROR_IND;
- ud->DEST_length = data->SRC_length;
- ud->DEST_offset = data->SRC_offset + d;
- ud->OPT_length = data->OPT_length;
- ud->OPT_offset = data->OPT_offset + d;
- ud->ERROR_type = ENETDOWN;
- if (data->SRC_length) {
- bcopy (mp->b_rptr +
- data->SRC_offset,
- nmp->b_rptr +
- ud->DEST_offset,
- data->SRC_length);
- }
- if (data->OPT_length) {
- bcopy (mp->b_rptr +
- data->OPT_offset,
- nmp->b_rptr +
- ud->OPT_offset,
- data->OPT_length);
- }
- nmp->b_wptr += d;
- nmp->b_wptr += (mp->b_wptr - mp->b_rptr);
- nmp->b_datap->db_type = M_PROTO;
- putnext (q, nmp);
- rpcmod_send_uderr--;
- }
- }
- }
-#endif
switch (mp->b_datap->db_type) {
default :
putnext (q, mp);
@@ -774,14 +722,12 @@ rpcmodrput(queue_t *q, mblk_t *mp)
pptr = (union T_primitives *)mp->b_rptr;
/*
- * Forward this message to krpc if it is data.
+ * Forward this message to kRPC if it is data.
*/
if (pptr->type == T_UNITDATA_IND) {
- mblk_t *nmp;
-
- /*
- * Check if the module is being popped.
- */
+ /*
+ * Check if the module is being popped.
+ */
mutex_enter (&rmp->rm_lock);
if (rmp->rm_state & RM_CLOSING) {
mutex_exit (&rmp->rm_lock);
@@ -818,47 +764,21 @@ rpcmodrput(queue_t *q, mblk_t *mp)
case RPC_SERVER:
/*
* rm_krpc_cell is exclusively used by the kRPC
- * CLTS server
+ * CLTS server. Try to submit the message to
+ * kRPC. Since this is an unreliable channel, we
+ * can just free the message in case the kRPC
+ * does not accept new messages.
*/
- if (rmp->rm_krpc_cell) {
-#ifdef DEBUG
- /*
- * Test duplicate request cache and
- * rm_ref count handling by sending a
- * duplicate every so often, if
- * desired.
- */
- if (rpcmod_send_dup &&
- rpcmod_send_dup_cnt++ %
- rpcmod_send_dup)
- nmp = copymsg (mp);
- else
- nmp = NULL ;
-#endif
+ if (rmp->rm_krpc_cell &&
+ svc_queuereq (q, mp, TRUE )) {
/*
* Raise the reference count on this
* module to prevent it from being
- * popped before krpc generates the
+ * popped before kRPC generates the
* reply.
*/
rmp->rm_ref++;
mutex_exit (&rmp->rm_lock);
-
- /*
- * Submit the message to krpc.
- */
- svc_queuereq (q, mp);
-#ifdef DEBUG
- /*
- * Send duplicate if we created one.
- */
- if (nmp) {
- mutex_enter (&rmp->rm_lock);
- rmp->rm_ref++;
- mutex_exit (&rmp->rm_lock);
- svc_queuereq (q, nmp);
- }
-#endif
} else {
mutex_exit (&rmp->rm_lock);
freemsg (mp);
@@ -1030,8 +950,9 @@ rpcmodwsrv(queue_t *q)
}
}
+/* ARGSUSED */
static void
-rpcmod_release (queue_t *q, mblk_t *bp)
+rpcmod_release (queue_t *q, mblk_t *bp, bool_t enable )
{
struct rpcm *rmp;
@@ -1084,7 +1005,7 @@ rpcmod_release(queue_t *q, mblk_t *bp)
static int mir_clnt_dup_request (queue_t *q, mblk_t *mp);
static void mir_rput_proto (queue_t *q, mblk_t *mp);
static int mir_svc_policy_notify (queue_t *q, int event);
-static void mir_svc_release (queue_t *wq, mblk_t *mp);
+static void mir_svc_release (queue_t *wq, mblk_t *mp, bool_t );
static void mir_svc_start (queue_t *wq);
static void mir_svc_idle_start (queue_t *, mir_t *);
static void mir_svc_idle_stop (queue_t *, mir_t *);
@@ -1099,7 +1020,7 @@ static void mir_disconnect(queue_t *, mir_t *ir);
static int mir_check_len (queue_t *, int32_t , mblk_t *);
static void mir_timer (void *);
-extern void (*mir_rele)(queue_t *, mblk_t *);
+extern void (*mir_rele)(queue_t *, mblk_t *, bool_t );
extern void (*mir_start)(queue_t *);
extern void (*clnt_stop_idle)(queue_t *);
@@ -1256,7 +1177,7 @@ mir_close(queue_t *q)
mutex_exit (&mir->mir_mutex);
qprocsoff (q);
- /* Notify KRPC that this stream is going away. */
+ /* Notify kRPC that this stream is going away. */
svc_queueclose (q);
} else {
mutex_exit (&mir->mir_mutex);
@@ -1337,7 +1258,7 @@ mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
mir_t *mir;
RPCLOG (32 , " rpcmod: mir_open of q 0x%p \n " , (void *)q);
- /* Set variables used directly by KRPC . */
+ /* Set variables used directly by kRPC . */
if (!mir_rele)
mir_rele = mir_svc_release;
if (!mir_start)
@@ -1357,15 +1278,15 @@ mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
* be held on the read-side queue until the stream is completely
* initialized with a RPC_CLIENT or RPC_SERVER ioctl. During
* the ioctl processing, the flag is cleared and any messages that
- * arrived between the open and the ioctl are delivered to KRPC .
+ * arrived between the open and the ioctl are delivered to kRPC .
*
* Early data should never arrive on a client stream since
* servers only respond to our requests and we do not send any.
* until after the stream is initialized. Early data is
* very common on a server stream where the client will start
* sending data as soon as the connection is made (and this
* is especially true with TCP where the protocol accepts the
- * connection before nfsd or KRPC is notified about it).
+ * connection before nfsd or kRPC is notified about it).
*/
mir->mir_hold_inbound = 1 ;
@@ -1420,7 +1341,7 @@ mir_rput(queue_t *q, mblk_t *mp)
* If the stream has not been set up as a RPC_CLIENT or RPC_SERVER
* with the corresponding ioctl, then don't accept
* any inbound data. This should never happen for streams
- * created by nfsd or client-side KRPC because they are careful
+ * created by nfsd or client-side kRPC because they are careful
* to set the mode of the stream before doing anything else.
*/
if (mir->mir_type == 0 ) {
@@ -1456,7 +1377,7 @@ mir_rput(queue_t *q, mblk_t *mp)
* If a module on the stream is trying set the Stream head's
* high water mark, then set our hiwater to the requested
* value. We are the "stream head" for all inbound
- * data messages since messages are passed directly to KRPC .
+ * data messages since messages are passed directly to kRPC .
*/
if (MBLKL (mp) >= sizeof (struct stroptions)) {
struct stroptions *stropts;
@@ -1629,23 +1550,32 @@ mir_rput(queue_t *q, mblk_t *mp)
case RPC_SERVER:
/*
* Check for flow control before passing the
- * message to KRPC .
+ * message to kRPC .
*/
if (!mir->mir_hold_inbound) {
if (mir->mir_krpc_cell) {
- /*
- * If the reference count is 0
- * (not including this request),
- * then the stream is transitioning
- * from idle to non-idle. In this case,
- * we cancel the idle timer.
- */
- if (mir->mir_ref_cnt++ == 0 )
- stop_timer = B_TRUE;
+
if (mir_check_len (q,
- (int32_t )msgdsize (mp), mp))
+ (int32_t )msgdsize (head_mp),
+ head_mp))
return ;
- svc_queuereq (q, head_mp); /* to KRPC */
+
+ if (q->q_first == NULL &&
+ svc_queuereq (q, head_mp, TRUE )) {
+ /*
+ * If the reference count is 0
+ * (not including this
+ * request), then the stream is
+ * transitioning from idle to
+ * non-idle. In this case, we
+ * cancel the idle timer.
+ */
+ if (mir->mir_ref_cnt++ == 0 )
+ stop_timer = B_TRUE;
+ } else {
+ (void ) putq (q, head_mp);
+ mir->mir_inrservice = B_TRUE;
+ }
} else {
/*
* Count # of times this happens. Should
@@ -1811,7 +1741,7 @@ mir_rput_proto(queue_t *q, mblk_t *mp)
break ;
default :
RPCLOG (1 , " mir_rput: unexpected message %d "
- " for KRPC client\n " ,
+ " for kRPC client\n " ,
((union T_primitives *)mp->b_rptr)->type);
break ;
}
@@ -1925,37 +1855,12 @@ mir_rput_proto(queue_t *q, mblk_t *mp)
* outbound flow control is exerted. When outbound flow control is
* relieved, mir_wsrv qenables the read-side queue. Read-side queues
* are not enabled by STREAMS and are explicitly noenable'ed in mir_open.
- *
- * For the server side, we have two types of messages queued. The first type
- * are messages that are ready to be XDR decoded and and then sent to the
- * RPC program's dispatch routine. The second type are "raw" messages that
- * haven't been processed, i.e. assembled from rpc record fragements into
- * full requests. The only time we will see the second type of message
- * queued is if we have a memory allocation failure while processing a
- * a raw message. The field mir_first_non_processed_mblk will mark the
- * first such raw message. So the flow for server side is:
- *
- * - send processed queued messages to kRPC until we run out or find
- * one that needs additional processing because we were short on memory
- * earlier
- * - process a message that was deferred because of lack of
- * memory
- * - continue processing messages until the queue empties or we
- * have to stop because of lack of memory
- * - during each of the above phase, if the queue is empty and
- * there are no pending messages that were passed to the RPC
- * layer, send upstream the pending disconnect/ordrel indication if
- * there is one
- *
- * The read-side queue is also enabled by a bufcall callback if dupmsg
- * fails in mir_rput.
*/
static void
mir_rsrv (queue_t *q)
{
mir_t *mir;
mblk_t *mp;
- mblk_t *cmp = NULL ;
boolean_t stop_timer = B_FALSE;
mir = (mir_t *)q->q_ptr;
@@ -1966,43 +1871,28 @@ mir_rsrv(queue_t *q)
case RPC_SERVER:
if (mir->mir_ref_cnt == 0 )
mir->mir_hold_inbound = 0 ;
- if (mir->mir_hold_inbound) {
-
- ASSERT (cmp == NULL );
- if (q->q_first == NULL ) {
-
- MIR_CLEAR_INRSRV (mir);
-
- if (MIR_SVC_QUIESCED (mir)) {
- cmp = mir->mir_svc_pend_mp;
- mir->mir_svc_pend_mp = NULL ;
- }
- }
-
- mutex_exit (&mir->mir_mutex);
-
- if (cmp != NULL ) {
- RPCLOG (16 , " mir_rsrv: line %d : sending a held "
- " disconnect/ord rel indication upstream\n " ,
- __LINE__);
- putnext (q, cmp);
- }
+ if (mir->mir_hold_inbound)
+ break ;
- return ;
- }
while (mp = getq (q)) {
if (mir->mir_krpc_cell &&
(mir->mir_svc_no_more_msgs == 0 )) {
- /*
- * If we were idle, turn off idle timer since
- * we aren't idle any more.
- */
- if (mir->mir_ref_cnt++ == 0 )
- stop_timer = B_TRUE;
+
if (mir_check_len (q,
(int32_t )msgdsize (mp), mp))
return ;
- svc_queuereq (q, mp);
+
+ if (svc_queuereq (q, mp, TRUE )) {
+ /*
+ * If we were idle, turn off idle timer
+ * since we aren't idle any more.
+ */
+ if (mir->mir_ref_cnt++ == 0 )
+ stop_timer = B_TRUE;
+ } else {
+ (void ) putbq (q, mp);
+ break ;
+ }
} else {
/*
* Count # of times this happens. Should be
@@ -2041,10 +1931,10 @@ mir_rsrv(queue_t *q)
}
if (q->q_first == NULL ) {
+ mblk_t *cmp = NULL ;
MIR_CLEAR_INRSRV (mir);
- ASSERT (cmp == NULL );
if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED (mir)) {
cmp = mir->mir_svc_pend_mp;
mir->mir_svc_pend_mp = NULL ;
@@ -2111,15 +2001,14 @@ mir_svc_start_close(queue_t *wq, mir_t *mir)
ASSERT ((wq->q_flag & QREADR) == 0 );
ASSERT (mir->mir_type == RPC_SERVER);
-
/*
* Do not accept any more messages.
*/
mir->mir_svc_no_more_msgs = 1 ;
/*
- * Next two statements will make the read service procedure invoke
- * svc_queuereq() on everything stuck in the streams read queue.
+ * Next two statements will make the read service procedure
+ * free everything stuck in the streams read queue.
* It's not necessary because enabling the write queue will
* have the same effect, but why not speed the process along?
*/
@@ -2134,11 +2023,11 @@ mir_svc_start_close(queue_t *wq, mir_t *mir)
}
/*
- * This routine is called directly by KRPC after a request is completed,
+ * This routine is called directly by kRPC after a request is completed,
* whether a reply was sent or the request was dropped.
*/
static void
-mir_svc_release (queue_t *wq, mblk_t *mp)
+mir_svc_release (queue_t *wq, mblk_t *mp, bool_t enable )
{
mir_t *mir = (mir_t *)wq->q_ptr;
mblk_t *cmp = NULL ;
@@ -2147,6 +2036,9 @@ mir_svc_release(queue_t *wq, mblk_t *mp)
if (mp)
freemsg (mp);
+ if (enable)
+ qenable (RD (wq));
+
mutex_enter (&mir->mir_mutex);
/*
@@ -2194,7 +2086,7 @@ mir_svc_release(queue_t *wq, mblk_t *mp)
}
/*
- * This routine is called by server-side KRPC when it is ready to
+ * This routine is called by server-side kRPC when it is ready to
* handle inbound messages on the stream.
*/
static void
@@ -2286,7 +2178,7 @@ mir_timer(void *arg)
* For clients, the timer fires at clnt_idle_timeout
* intervals. If the activity marker (mir_clntreq) is
* zero, then the stream has been idle since the last
- * timer event and we notify KRPC . If mir_clntreq is
+ * timer event and we notify kRPC . If mir_clntreq is
* non-zero, then the stream is active and we just
* restart the timer for another interval. mir_clntreq
* is set to 1 in mir_wput for every request passed
@@ -2337,10 +2229,10 @@ printf("mir_timer[%d]: doing client timeout\n", now / hz);
mutex_exit (&mir->mir_mutex);
/*
* We pass T_ORDREL_REQ as an integer value
- * to KRPC as the indication that the stream
+ * to kRPC as the indication that the stream
* is idle. This is not a T_ORDREL_REQ message,
* it is just a convenient value since we call
- * the same KRPC routine for T_ORDREL_INDs and
+ * the same kRPC routine for T_ORDREL_INDs and
* T_DISCON_INDs.
*/
clnt_dispatch_notifyall (wq, T_ORDREL_REQ, 0 );
@@ -2354,7 +2246,7 @@ printf("mir_timer[%d]: doing client timeout\n", now / hz);
* by mir_wput when mir_type is set to RPC_SERVER and
* by mir_svc_idle_start whenever the stream goes idle
* (mir_ref_cnt == 0). The timer is cancelled in
- * mir_rput whenever a new inbound request is passed to KRPC
+ * mir_rput whenever a new inbound request is passed to kRPC
* and the stream was previously idle.
*
* The timer interval can be changed for individual
@@ -2424,12 +2316,12 @@ mir_wput(queue_t *q, mblk_t *mp)
!IS_P2ALIGNED (mp->b_rptr, sizeof (uint32_t ))) {
/*
* Since we know that M_DATA messages are created exclusively
- * by KRPC , we expect that KRPC will leave room for our header
+ * by kRPC , we expect that kRPC will leave room for our header
* and 4 byte align which is normal for XDR.
- * If KRPC (or someone else) does not cooperate, then we
+ * If kRPC (or someone else) does not cooperate, then we
* just throw away the message.
*/
- RPCLOG (1 , " mir_wput: KRPC did not leave space for record "
+ RPCLOG (1 , " mir_wput: kRPC did not leave space for record "
" fragment header (%d bytes left)\n " ,
(int )(rptr - mp->b_datap->db_base));
freemsg (mp);
@@ -2650,7 +2542,7 @@ mir_wput_other(queue_t *q, mblk_t *mp)
/*
* If the stream is not idle, then we hold the
* orderly release until it becomes idle. This
- * ensures that KRPC will be able to reply to
+ * ensures that kRPC will be able to reply to
* all requests that we have passed to it.
*
* We also queue the request if there is data already
@@ -2896,10 +2788,10 @@ mir_disconnect(queue_t *q, mir_t *mir)
mutex_exit (&mir->mir_mutex);
/*
- * T_DISCON_REQ is passed to KRPC as an integer value
+ * T_DISCON_REQ is passed to kRPC as an integer value
* (this is not a TPI message). It is used as a
* convenient value to indicate a sanity check
- * failure -- the same KRPC routine is also called
+ * failure -- the same kRPC routine is also called
* for T_DISCON_INDs and T_ORDREL_INDs.
*/
clnt_dispatch_notifyall (WR (q), T_DISCON_REQ, 0 );
@@ -2944,7 +2836,7 @@ mir_check_len(queue_t *q, int32_t frag_len, mblk_t *head_mp)
mir->mir_frag_len = -(int32_t )sizeof (uint32_t );
if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) {
cmn_err (CE_NOTE,
- " KRPC : record fragment from %s of size(%d ) exceeds "
+ " kRPC : record fragment from %s of size(%d ) exceeds "
" maximum (%u ). Disconnecting" ,
(mir->mir_type == RPC_CLIENT) ? " server" :
(mir->mir_type == RPC_SERVER) ? " client" :
0 comments on commit
2695d4f