Skip to content

Commit

Permalink
Recalculate the timeout of conn#wait_for_notify and conn#block in
Browse files Browse the repository at this point in the history
case of socket events that require re-runs of select().

This previously caused a timeout higher than the given value,
because the timeout was set to the original value for every re-run.
  • Loading branch information
larskanis committed Jul 18, 2013
1 parent 4c5704a commit 241ab21
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 11 deletions.
24 changes: 24 additions & 0 deletions ext/pg.h
Expand Up @@ -66,6 +66,30 @@
# include "ruby/io.h"
#endif

#ifndef timeradd
#define timeradd(a, b, result) \
do { \
(result)->tv_sec = (a)->tv_sec + (b)->tv_sec; \
(result)->tv_usec = (a)->tv_usec + (b)->tv_usec; \
if ((result)->tv_usec >= 1000000L) { \
++(result)->tv_sec; \
(result)->tv_usec -= 1000000L; \
} \
} while (0)
#endif

#ifndef timersub
#define timersub(a, b, result) \
do { \
(result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
(result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
if ((result)->tv_usec < 0) { \
--(result)->tv_sec; \
(result)->tv_usec += 1000000L; \
} \
} while (0)
#endif

/* PostgreSQL headers */
#include "libpq-fe.h"
#include "libpq/libpq-fs.h" /* large-object interface */
Expand Down
47 changes: 39 additions & 8 deletions ext/pg_connection.c
Expand Up @@ -2214,6 +2214,7 @@ wait_socket_readable( PGconn *conn, struct timeval *ptimeout, void *(*is_readabl
{
int sd = PQsocket( conn );
void *retval;
struct timeval aborttime={0,0}, currtime, waittime;
DWORD timeout_milisec = INFINITE;
DWORD wait_ret;
WSAEVENT hEvent;
Expand All @@ -2223,23 +2224,36 @@ wait_socket_readable( PGconn *conn, struct timeval *ptimeout, void *(*is_readabl

hEvent = WSACreateEvent();

if ( ptimeout ) {
timeout_milisec = (DWORD)( ptimeout->tv_sec * 1e3 + ptimeout->tv_usec / 1e3 );
}

/* Check for connection errors (PQisBusy is true on connection errors) */
if( PQconsumeInput(conn) == 0 ) {
WSACloseEvent( hEvent );
rb_raise( rb_eConnectionBad, "PQconsumeInput() %s", PQerrorMessage(conn) );
}

if ( ptimeout ) {
gettimeofday(&currtime, NULL);
timeradd(&currtime, ptimeout, &aborttime);
}

while ( !(retval=is_readable(conn)) ) {
if ( WSAEventSelect(sd, hEvent, FD_READ|FD_CLOSE) == SOCKET_ERROR ) {
WSACloseEvent( hEvent );
rb_raise( rb_eConnectionBad, "WSAEventSelect socket error: %d", WSAGetLastError() );
}

wait_ret = rb_w32_wait_events( &hEvent, 1, timeout_milisec );
if ( ptimeout ) {
gettimeofday(&currtime, NULL);
timersub(&aborttime, &currtime, &waittime);
timeout_milisec = (DWORD)( waittime.tv_sec * 1e3 + waittime.tv_usec / 1e3 );
}

/* Is the given timeout valid? */
if( !ptimeout || (waittime.tv_sec >= 0 && waittime.tv_usec >= 0) ){
/* Wait for the socket to become readable before checking again */
wait_ret = rb_w32_wait_events( &hEvent, 1, timeout_milisec );
} else {
wait_ret = WAIT_TIMEOUT;
}

if ( wait_ret == WAIT_TIMEOUT ) {
WSACloseEvent( hEvent );
Expand Down Expand Up @@ -2280,6 +2294,7 @@ wait_socket_readable( PGconn *conn, struct timeval *ptimeout, void *(*is_readabl
int ret;
void *retval;
rb_fdset_t sd_rset;
struct timeval aborttime={0,0}, currtime, waittime;
#ifdef _WIN32
rb_fdset_t crt_sd_rset;
#endif
Expand All @@ -2291,7 +2306,12 @@ wait_socket_readable( PGconn *conn, struct timeval *ptimeout, void *(*is_readabl
if ( PQconsumeInput(conn) == 0 )
rb_raise( rb_eConnectionBad, "PQconsumeInput() %s", PQerrorMessage(conn) );

rb_fd_init( &sd_rset );
rb_fd_init( &sd_rset );

if ( ptimeout ) {
gettimeofday(&currtime, NULL);
timeradd(&currtime, ptimeout, &aborttime);
}

while ( !(retval=is_readable(conn)) ) {
rb_fd_zero( &sd_rset );
Expand All @@ -2305,8 +2325,19 @@ wait_socket_readable( PGconn *conn, struct timeval *ptimeout, void *(*is_readabl
create_crt_fd(&sd_rset, &crt_sd_rset);
#endif

/* Wait for the socket to become readable before checking again */
ret = rb_thread_fd_select( sd+1, &sd_rset, NULL, NULL, ptimeout );
if ( ptimeout ) {
gettimeofday(&currtime, NULL);
timersub(&aborttime, &currtime, &waittime);
}

/* Is the given timeout valid? */
if( !ptimeout || (waittime.tv_sec >= 0 && waittime.tv_usec >= 0) ){
/* Wait for the socket to become readable before checking again */
ret = rb_thread_fd_select( sd+1, &sd_rset, NULL, NULL, ptimeout ? &waittime : NULL );
} else {
ret = 0;
}


#ifdef _WIN32
cleanup_crt_fd(&sd_rset, &crt_sd_rset);
Expand Down
16 changes: 13 additions & 3 deletions spec/pg/connection_spec.rb
Expand Up @@ -427,9 +427,19 @@
@conn.exec( 'UNLISTEN woo' )
end

it "can receive notices while waiting for NOTIFY" do
@conn.send_query "do $$ BEGIN RAISE NOTICE 'woohoo'; END; $$ LANGUAGE plpgsql;"
@conn.wait_for_notify( 0.5 ).should be_nil
it "can receive notices while waiting for NOTIFY without exceeding the timeout" do
notices = []
@conn.set_notice_processor do |msg|
notices << [msg, Time.now]
end
st = Time.now
@conn.send_query "SELECT pg_sleep(0.5); do $$ BEGIN RAISE NOTICE 'woohoo'; END; $$ LANGUAGE plpgsql;"
@conn.wait_for_notify( 1 ).should be_nil
notices.first.should_not be_nil
et = Time.now
(et - notices.first[1]).should >= 0.4
(et - st).should >= 0.9
(et - st).should < 1.4
end

it "yields the result if block is given to exec" do
Expand Down

0 comments on commit 241ab21

Please sign in to comment.