Skip to content

Commit

Permalink
rxrpc: Move client call connection to the I/O thread
Browse files Browse the repository at this point in the history
Move the connection setup of client calls to the I/O thread so that a whole
load of locking and barrierage can be eliminated.  This necessitates the
app thread waiting for connection to complete before it can begin
encrypting data.

Signed-off-by: David Howells <dhowells@redhat.com>
  • Loading branch information
dhowells committed Nov 3, 2022
1 parent f2743e8 commit 1f5e9c5
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 515 deletions.
2 changes: 2 additions & 0 deletions include/trace/events/rxrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@
EM(rxrpc_call_get_release_sock, "GET rel-sock") \
EM(rxrpc_call_get_sendmsg, "GET sendmsg ") \
EM(rxrpc_call_get_userid, "GET user-id ") \
EM(rxrpc_call_get_wait_connect, "GET wait-con") \
EM(rxrpc_call_new_client, "NEW client ") \
EM(rxrpc_call_new_prealloc_service, "NEW prealloc") \
EM(rxrpc_call_put_discard_prealloc, "PUT disc-pre") \
Expand All @@ -269,6 +270,7 @@
EM(rxrpc_call_put_sendmsg, "PUT sendmsg ") \
EM(rxrpc_call_put_unnotify, "PUT unnotify") \
EM(rxrpc_call_put_userid_exists, "PUT u-exists") \
EM(rxrpc_call_put_userid, "PUT user-id ") \
EM(rxrpc_call_see_accept, "SEE accept ") \
EM(rxrpc_call_see_activate_client, "SEE act-clnt") \
EM(rxrpc_call_see_connect_failed, "SEE con-fail") \
Expand Down
21 changes: 11 additions & 10 deletions net/rxrpc/ar-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ struct rxrpc_local {
struct rb_root client_bundles; /* Client connection bundles by socket params */
spinlock_t client_bundles_lock; /* Lock for client_bundles */
bool kill_all_client_conns;
spinlock_t client_conn_cache_lock; /* Lock for ->*_client_conns */
struct list_head idle_client_conns;
struct timer_list client_conn_reap_timer;
unsigned long client_conn_flags;
Expand All @@ -302,7 +301,8 @@ struct rxrpc_local {
bool dead;
bool service_closed; /* Service socket closed */
struct idr conn_ids; /* List of connection IDs */
spinlock_t conn_lock; /* Lock for client connection pool */
struct list_head new_client_calls; /* Newly created client calls need connection */
spinlock_t client_call_lock; /* Lock for ->new_client_calls */
struct sockaddr_rxrpc srx; /* local address */
};

Expand Down Expand Up @@ -383,7 +383,6 @@ enum rxrpc_call_completion {
* Bits in the connection flags.
*/
enum rxrpc_conn_flag {
RXRPC_CONN_HAS_IDR, /* Has a client conn ID assigned */
RXRPC_CONN_IN_SERVICE_CONNS, /* Conn is in peer->service_conns */
RXRPC_CONN_DONT_REUSE, /* Don't reuse this connection */
RXRPC_CONN_PROBING_FOR_UPGRADE, /* Probing for service upgrade */
Expand Down Expand Up @@ -411,6 +410,7 @@ enum rxrpc_conn_event {
*/
enum rxrpc_conn_proto_state {
RXRPC_CONN_UNUSED, /* Connection not yet attempted */
RXRPC_CONN_CLIENT_UNSECURED, /* Client connection needs security init */
RXRPC_CONN_CLIENT, /* Client connection */
RXRPC_CONN_SERVICE_PREALLOC, /* Service connection preallocation */
RXRPC_CONN_SERVICE_UNSECURED, /* Service unsecured connection */
Expand All @@ -433,11 +433,8 @@ struct rxrpc_bundle {
u32 security_level; /* Security level selected */
u16 service_id; /* Service ID for this connection */
bool try_upgrade; /* True if the bundle is attempting upgrade */
bool alloc_conn; /* True if someone's getting a conn */
bool exclusive; /* T if conn is exclusive */
bool upgrade; /* T if service ID can be upgraded */
short alloc_error; /* Error from last conn allocation */
spinlock_t channel_lock;
struct rb_node local_node; /* Node in local->client_conns */
struct list_head waiting_calls; /* Calls waiting for channels */
unsigned long avail_chans; /* Mask of available channels */
Expand All @@ -463,7 +460,7 @@ struct rxrpc_connection {
unsigned char act_chans; /* Mask of active channels */
struct rxrpc_channel {
unsigned long final_ack_at; /* Time at which to issue final ACK */
struct rxrpc_call __rcu *call; /* Active call */
struct rxrpc_call *call; /* Active call */
unsigned int call_debug_id; /* call->debug_id */
u32 call_id; /* ID of current call */
u32 call_counter; /* Call ID counter */
Expand All @@ -483,6 +480,7 @@ struct rxrpc_connection {
struct list_head link; /* link in master connection list */
struct sk_buff_head rx_queue; /* received conn-level packets */

struct mutex security_lock; /* Lock for security management */
const struct rxrpc_security *security; /* applied security module */
union {
struct {
Expand Down Expand Up @@ -613,7 +611,7 @@ struct rxrpc_call {
struct timer_list timer; /* Combined event timer */
rxrpc_notify_rx_t notify_rx; /* kernel service Rx notification function */
struct list_head link; /* link in master call list */
struct list_head chan_wait_link; /* Link in conn->bundle->waiting_calls */
struct list_head wait_link; /* Link in local->new_client_calls */
struct hlist_node error_link; /* link in error distribution list */
struct list_head accept_link; /* Link in rx->acceptq */
struct list_head recvmsg_link; /* Link in rx->recvmsg_q */
Expand Down Expand Up @@ -862,6 +860,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
struct sockaddr_rxrpc *,
struct rxrpc_call_params *, gfp_t,
unsigned int);
void rxrpc_start_call_timer(struct rxrpc_call *);
void rxrpc_incoming_call(struct rxrpc_sock *, struct rxrpc_call *,
struct sk_buff *);
void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
Expand Down Expand Up @@ -896,6 +895,7 @@ static inline void rxrpc_set_call_state(struct rxrpc_call *call,
{
/* Order write of completion info before write of ->state. */
smp_store_release(&call->_state, state);
wake_up(&call->waitq);
}

static inline enum rxrpc_call_state __rxrpc_call_state(const struct rxrpc_call *call)
Expand Down Expand Up @@ -932,10 +932,11 @@ extern unsigned int rxrpc_reap_client_connections;
extern unsigned long rxrpc_conn_idle_client_expiry;
extern unsigned long rxrpc_conn_idle_client_fast_expiry;

void rxrpc_destroy_client_conn_ids(struct rxrpc_local *);
void rxrpc_purge_client_connections(struct rxrpc_local *);
struct rxrpc_bundle *rxrpc_get_bundle(struct rxrpc_bundle *, enum rxrpc_bundle_trace);
void rxrpc_put_bundle(struct rxrpc_bundle *, enum rxrpc_bundle_trace);
int rxrpc_connect_call(struct rxrpc_call *, gfp_t);
int rxrpc_look_up_bundle(struct rxrpc_call *, gfp_t);
void rxrpc_connect_client_calls(struct rxrpc_local *);
void rxrpc_expose_client_call(struct rxrpc_call *);
void rxrpc_disconnect_client_call(struct rxrpc_bundle *, struct rxrpc_call *);
void rxrpc_put_client_conn(struct rxrpc_connection *, enum rxrpc_conn_trace);
Expand Down
51 changes: 42 additions & 9 deletions net/rxrpc/call_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,

timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
INIT_LIST_HEAD(&call->link);
INIT_LIST_HEAD(&call->chan_wait_link);
INIT_LIST_HEAD(&call->wait_link);
INIT_LIST_HEAD(&call->accept_link);
INIT_LIST_HEAD(&call->recvmsg_link);
INIT_LIST_HEAD(&call->sock_link);
Expand Down Expand Up @@ -244,7 +244,7 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
/*
* Initiate the call ack/resend/expiry timer.
*/
static void rxrpc_start_call_timer(struct rxrpc_call *call)
void rxrpc_start_call_timer(struct rxrpc_call *call)
{
unsigned long now = jiffies;
unsigned long j = now + MAX_JIFFY_OFFSET;
Expand Down Expand Up @@ -288,6 +288,40 @@ static void rxrpc_put_call_slot(struct rxrpc_call *call)
up(limiter);
}

/*
* Start the process of connecting a call. We obtain a peer and a connection
* bundle, but the actual association of a call with a connection is offloaded
* to the I/O thread to simplify locking.
*/
static int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp)
{
struct rxrpc_local *local = call->local;
int ret = 0;

_enter("{%d,%lx},", call->debug_id, call->user_call_ID);

rxrpc_get_call(call, rxrpc_call_get_io_thread);

call->peer = rxrpc_lookup_peer(local, &call->dest_srx, gfp);
if (!call->peer)
goto error;

ret = rxrpc_look_up_bundle(call, gfp);
if (ret < 0)
goto error;

rxrpc_get_call(call, rxrpc_call_get_wait_connect);
spin_lock(&local->client_call_lock);
list_add_tail(&call->wait_link, &local->new_client_calls);
spin_unlock(&local->client_call_lock);
return 0;

error:
trace_rxrpc_client(call->conn, ret, rxrpc_client_chan_wait_failed);
rxrpc_prefail_call(call, RXRPC_CALL_LOCAL_ERROR, ret);
return ret;
}

/*
* Set up a call for the given parameters.
* - Called with the socket lock held, which it must release.
Expand Down Expand Up @@ -371,10 +405,6 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
if (ret < 0)
goto error_attached_to_socket;

rxrpc_see_call(call, rxrpc_call_see_connected);

rxrpc_start_call_timer(call);

_leave(" = %p [new]", call);
return call;

Expand Down Expand Up @@ -459,7 +489,7 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
chan = sp->hdr.cid & RXRPC_CHANNELMASK;
conn->channels[chan].call_counter = call->call_id;
conn->channels[chan].call_id = call->call_id;
rcu_assign_pointer(conn->channels[chan].call, call);
conn->channels[chan].call = call;
spin_unlock(&conn->state_lock);

spin_lock(&conn->peer->lock);
Expand Down Expand Up @@ -518,7 +548,7 @@ static void rxrpc_cleanup_ring(struct rxrpc_call *call)
void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
{
struct rxrpc_connection *conn = call->conn;
bool put = false;
bool put = false, putu = false;

_enter("{%d,%d}", call->debug_id, refcount_read(&call->ref));

Expand Down Expand Up @@ -553,14 +583,17 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
if (test_and_clear_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
rb_erase(&call->sock_node, &rx->calls);
memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
rxrpc_put_call(call, rxrpc_call_put_userid_exists);
putu = true;
}

list_del(&call->sock_link);
write_unlock(&rx->call_lock);

_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);

if (putu)
rxrpc_put_call(call, rxrpc_call_put_userid);

_leave("");
}

Expand Down

0 comments on commit 1f5e9c5

Please sign in to comment.