Skip to content

Commit

Permalink
Fix race in TCP Main <> TCP Worker communication
Browse files Browse the repository at this point in the history
As a TCP worker has 2 comm channels (one TCP worker specific and one worker generic) to TCP Main, be sure we have a consistent communication (over the 2 channels) between the TCP worker and TCP main.
The actual fixed race: the TCP workers creates a new TCP conn (and sends this event as TCP worker to Main) and when trying to write to the conn it fails (and sends back a bad-conn event to Main, but as generic worker) - the 2 events are delivered via different comm channels, so their order may be lost, reasulting to bogus behavior
  • Loading branch information
bogdan-iancu committed Mar 10, 2023
1 parent addfa63 commit bfb6221
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 35 deletions.
14 changes: 4 additions & 10 deletions net/net_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1312,9 +1312,7 @@ inline static int handle_tcp_worker(struct tcp_worker* tcp_c, int fd_i)
sh_log(tcpconn->hist, TCP_UNREF, "tcpworker release write, (%d)", tcpconn->refcnt);
tcpconn_put(tcpconn);
break;
case ASYNC_WRITE:
/* fall through*/
case ASYNC_WRITE2:
case ASYNC_WRITE_TCPW:
if (tcpconn->state==S_CONN_BAD){
sh_log(tcpconn->hist, TCP_UNREF, "tcpworker async write bad, (%d)", tcpconn->refcnt);
tcpconn_destroy(tcpconn);
Expand All @@ -1326,12 +1324,10 @@ inline static int handle_tcp_worker(struct tcp_worker* tcp_c, int fd_i)
reactor_add_writer( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
tcpconn->flags&=~F_CONN_REMOVED_WRITE;
break;
case CONN_ERROR:
case CONN_ERROR_TCPW:
case CONN_DESTROY:
case CONN_EOF:
/* WARNING: this will auto-dec. refcnt! */
/* fall through*/
case CONN_ERROR2:
if ((tcpconn->flags & F_CONN_REMOVED) != F_CONN_REMOVED &&
(tcpconn->s!=-1)){
reactor_del_all( tcpconn->s, -1, IO_FD_CLOSING);
Expand Down Expand Up @@ -1427,8 +1423,7 @@ inline static int handle_worker(struct process_table* p, int fd_i)
goto end;
}
switch(cmd){
case CONN_ERROR:
case CONN_ERROR2:
case CONN_ERROR_GENW:
/* remove from reactor only if the fd exists, and it wasn't
* removed before */
if ((tcpconn->flags & F_CONN_REMOVED) != F_CONN_REMOVED &&
Expand Down Expand Up @@ -1480,8 +1475,7 @@ inline static int handle_worker(struct process_table* p, int fd_i)
reactor_add_writer( tcpconn->s, F_TCPCONN, RCT_PRIO_NET, tcpconn);
tcpconn->flags&=~F_CONN_REMOVED_WRITE;
break;
case ASYNC_WRITE:
case ASYNC_WRITE2:
case ASYNC_WRITE_GENW:
if (tcpconn->state==S_CONN_BAD){
tcpconn->lifetime=0;
break;
Expand Down
51 changes: 30 additions & 21 deletions net/net_tcp_proc.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ extern struct struct_hist_list *con_hist;
#define tcpconn_release_error(_conn, _writer, _reason) \
do { \
tcp_trigger_report( _conn, TCP_REPORT_CLOSE, _reason);\
tcpconn_release( _conn, CONN_ERROR, _writer);\
tcpconn_release( _conn, CONN_ERROR_TCPW, _writer, 1/*as TCP worker*/);\
}while(0)




static void tcpconn_release(struct tcp_connection* c, long state,int writer)
static void tcpconn_release(struct tcp_connection* c, long state, int writer,
int as_tcp_worker)
{
long response[2];

Expand All @@ -83,7 +84,7 @@ static void tcpconn_release(struct tcp_connection* c, long state,int writer)
response[0]=(long)c;
response[1]=state;

if (send_all((tcpmain_sock==-1)?unix_tcp_sock:tcpmain_sock, response,
if (send_all( as_tcp_worker?tcpmain_sock:unix_tcp_sock, response,
sizeof(response))<=0)
LM_ERR("send_all failed state=%ld con=%p\n", state, c);
}
Expand All @@ -99,11 +100,11 @@ void tcp_conn_release(struct tcp_connection* c, int pending_data)
if (c->state==S_CONN_BAD) {
c->lifetime=0;
/* CONN_ERROR will auto-dec refcnt => we must not call tcpconn_put !!*/
tcpconn_release(c, CONN_ERROR2,1);
tcpconn_release(c, CONN_ERROR_GENW, 1, 0 /*not TCP, but GEN worker*/);
return;
}
if (pending_data) {
tcpconn_release(c, ASYNC_WRITE2,1);
tcpconn_release(c, ASYNC_WRITE_GENW, 1, 0 /*not TCP, but GEN worker*/);
return;
}
tcpconn_put(c);
Expand All @@ -117,7 +118,7 @@ int tcp_done_reading(struct tcp_connection* con)
tcpconn_check_del(con);
tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
if (con->fd!=-1) { close(con->fd); con->fd = -1; }
tcpconn_release(con, CONN_RELEASE,0);
tcpconn_release(con, CONN_RELEASE, 0, 1 /*as TCP proc*/);

_tcp_done_reading_marker = 1;
}
Expand Down Expand Up @@ -173,7 +174,7 @@ static void tcp_receive_timeout(void)
tcpconn_release_error(con, 0, "Read timeout with"
"incomplete SIP message");
else
tcpconn_release(con, CONN_RELEASE,0);
tcpconn_release(con, CONN_RELEASE, 0, 1 /*as TCP proc*/);
}
}
}
Expand Down Expand Up @@ -298,25 +299,31 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)
/* save FD which is valid in context of this TCP worker */
con->fd=s;
} else if (rw & IO_WATCH_WRITE) {
LM_DBG("Received con for async write %p ref = %d\n",con,con->refcnt);
LM_DBG("Received con for async write %p ref = %d\n",
con, con->refcnt);
lock_get(&con->write_lock);
resp = protos[con->type].net.write( (void*)con, s );
lock_release(&con->write_lock);
if (resp<0) {
ret=-1; /* some error occurred */
con->state=S_CONN_BAD;
sh_log(con->hist, TCP_SEND2MAIN, "handle write, err, state: %d, att: %d",
con->state, con->msg_attempts);
sh_log(con->hist, TCP_SEND2MAIN,
"handle write, err, state: %d, att: %d",
con->state, con->msg_attempts);
tcpconn_release_error(con, 1,"Write error");
break;
} else if (resp==1) {
sh_log(con->hist, TCP_SEND2MAIN, "handle write, async, state: %d, att: %d",
con->state, con->msg_attempts);
tcpconn_release(con, ASYNC_WRITE,1);
sh_log(con->hist, TCP_SEND2MAIN,
"handle write, async, state: %d, att: %d",
con->state, con->msg_attempts);
tcpconn_release(con, ASYNC_WRITE_TCPW, 1,
1 /*as TCP proc*/);
} else {
sh_log(con->hist, TCP_SEND2MAIN, "handle write, ok, state: %d, att: %d",
con->state, con->msg_attempts);
tcpconn_release(con, CONN_RELEASE_WRITE,1);
sh_log(con->hist, TCP_SEND2MAIN,
"handle write, ok, state: %d, att: %d",
con->state, con->msg_attempts);
tcpconn_release(con, CONN_RELEASE_WRITE, 1,
1/*as TCP proc*/);
}
ret = 0;
/* we always close the socket received for writing */
Expand All @@ -336,8 +343,9 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)
tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
con->proc_id = -1;
if (con->fd!=-1) { close(con->fd); con->fd = -1; }
sh_log(con->hist, TCP_SEND2MAIN, "handle read, err, resp: %d, att: %d",
resp, con->msg_attempts);
sh_log(con->hist, TCP_SEND2MAIN,
"handle read, err, resp: %d, att: %d",
resp, con->msg_attempts);
tcpconn_release_error(con, 0, "Read error");
} else if (con->state==S_CONN_EOF) {
reactor_del_all( con->fd, idx, IO_FD_CLOSING );
Expand All @@ -347,9 +355,10 @@ inline static int handle_io(struct fd_map* fm, int idx,int event_type)
if (con->fd!=-1) { close(con->fd); con->fd = -1; }
tcp_trigger_report( con, TCP_REPORT_CLOSE,
"EOF received");
sh_log(con->hist, TCP_SEND2MAIN, "handle read, EOF, resp: %d, att: %d",
resp, con->msg_attempts);
tcpconn_release(con, CONN_EOF,0);
sh_log(con->hist, TCP_SEND2MAIN,
"handle read, EOF, resp: %d, att: %d",
resp, con->msg_attempts);
tcpconn_release(con, CONN_EOF, 0, 1 /*as TCP proc*/);
} else {
if (con->profile.parallel_read)
/* return the connection if not already */
Expand Down
11 changes: 7 additions & 4 deletions net/tcp_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@


/* fd communication commands - internal usage ONLY */
enum conn_cmds { CONN_DESTROY=-4, CONN_ERROR=-3,CONN_ERROR2=-2, CONN_EOF=-1, CONN_RELEASE,
CONN_GET_FD, CONN_NEW, ASYNC_CONNECT, ASYNC_WRITE, ASYNC_WRITE2, CONN_RELEASE_WRITE };
/* CONN_RELEASE, EOF, ERROR, DESTROY can be used by "reader" processes
* CONN_GET_FD, NEW, ERROR only by writers */
enum conn_cmds { CONN_DESTROY=-4, CONN_ERROR_TCPW=-3,CONN_ERROR_GENW=-2,
CONN_EOF=-1, CONN_RELEASE, CONN_GET_FD, CONN_NEW, ASYNC_CONNECT,
ASYNC_WRITE_TCPW, ASYNC_WRITE_GENW, CONN_RELEASE_WRITE };
/* CONN_RELEASE[_WRITE], EOF, ERROR_TCPW, ASYNC_WRITE_TCPW, DESTROY can be
* used by TCP "reader" workers/processes
* CONN_GET_FD, NEW, CONNECT, ERROR_GENW, ASYNC_WRITE_GENW only by generic
* writer workers/processes */

#ifdef TCP_DEBUG_CONN
#define tcpconn_check_add(c) \
Expand Down

0 comments on commit bfb6221

Please sign in to comment.