Permalink
Browse files

Revert "- use nonblockign reads/writes to signal timeouts instead of"

This reverts commit 77dac84.

I thought my use of 'select' was causing bugs, it wasn't. The problem I
found was in logstash's lumberjack input.

Conflicts:
	proto.c
  • Loading branch information...
1 parent f409b4e commit 93012676b1722afa6a80f2e8713572b540f876f8 @jordansissel jordansissel committed Mar 7, 2013
Showing with 102 additions and 36 deletions.
  1. +102 −36 proto.c
View
138 proto.c
@@ -14,7 +14,6 @@
#include <errno.h>
#include <sys/stat.h>
#include <netdb.h>
-#include <fcntl.h>
#include "zlib.h"
#include "backoff.h"
@@ -31,7 +30,6 @@
#define LUMBERJACK_POLL_READ 0x01
#define LUMBERJACK_POLL_WRITE 0x02
-
static void lumberjack_init(void);
static int lumberjack_tcp_connect(struct lumberjack *lumberjack);
static int lumberjack_ssl_handshake(struct lumberjack *lumberjack);
@@ -42,13 +40,6 @@ static int lumberjack_write_window_size(struct lumberjack *lumberjack);
static int lumberjack_poll(struct lumberjack *lumberjack,
time_t seconds, int flags);
-static int lumberjack_ssl_write(const struct lumberjack *lumberjack,
- const void *buffer, const int length,
- const double timeout_seconds);
-static int lumberjack_ssl_read(const struct lumberjack *lumberjack, void *buffer,
- const int length,
- const double timeout_seconds);
-
static int lumberjack_init_done = 0;
static unsigned int rand_uint32() {
@@ -308,9 +299,7 @@ static int lumberjack_ssl_handshake(struct lumberjack *lumberjack) {
insist(lumberjack->ssl != NULL, "SSL_new must not return NULL");
SSL_set_connect_state(lumberjack->ssl); /* we're a client */
- //SSL_set_mode(lumberjack->ssl, SSL_MODE_AUTO_RETRY);
- fcntl(lumberjack->fd, F_SETFL, O_NONBLOCK);
- BIO_set_nbio(bio, 1);
+ SSL_set_mode(lumberjack->ssl, SSL_MODE_AUTO_RETRY); /* retry writes/reads that would block */
SSL_set_bio(lumberjack->ssl, bio, bio);
struct backoff sleeper;
@@ -370,6 +359,7 @@ int lumberjack_write(struct lumberjack *lumberjack, struct str *payload) {
} /* lumberjack_write */
int lumberjack_flush(struct lumberjack *lumberjack) {
+ ssize_t bytes;
size_t length = str_length(lumberjack->io_buffer);
/* Zlib */
int rc;
@@ -401,34 +391,85 @@ int lumberjack_flush(struct lumberjack *lumberjack) {
str_append_char(header, LUMBERJACK_COMPRESSED_BLOCK_FRAME);
str_append_uint32(header, compressed_length);
- double timeout = 10.0; // seconds
+ time_t timeout = 30;
+ rc = lumberjack_poll(lumberjack, timeout, LUMBERJACK_POLL_WRITE);
+ if ((rc & LUMBERJACK_POLL_WRITE) == 0) {
+ /* socket was not writable after the given timeout. Fail it. */
+ flog(stdout, "Waited %d seconds for a writable socket. Giving up.", timeout);
+ lumberjack_disconnect(lumberjack);
+ return -1;
+ }
+
flog_if_slow(stdout, 1.0, {
- rc = lumberjack_ssl_write(lumberjack, str_data(header),
- str_length(header), timeout);
+ bytes = SSL_write(lumberjack->ssl, str_data(header), str_length(header));
}, "SSL_write (lumberjack compressed header)", NULL);
str_free(header);
- if (rc < 0) {
- flog(stdout, "lumberjack_ssl_write failed while sending header: %s",
- strerror(errno));
+ if (bytes < 0) {
+ /* error occurred while writing. */
+ rc = SSL_get_error(lumberjack->ssl, bytes);
+ if (rc == SSL_ERROR_SYSCALL) {
+ flog(stdout, "SSL_write failed: %s", strerror(errno));
+ } else {
+ flog(stdout, "SSL_write returned %d (code: %d), something is wrong",
+ bytes, rc);
+ flog(stdout, "SSL_write error vv");
+ ERR_print_errors_fp(stdout);
+ flog(stdout, "SSL_write error ^^");
+ }
lumberjack_disconnect(lumberjack);
return -1;
}
/* write the compressed payload */
- flog_if_slow(stdout, 1.0, {
- rc = lumberjack_ssl_write(lumberjack,
- str_data(lumberjack->compression_buffer),
- compressed_length, timeout);
- }, "lumberjack_ssl_write (compressed payload)", NULL);
+ ssize_t remaining = compressed_length;
+ size_t offset = 0;
+
+ struct timeval start;
+ gettimeofday(&start, NULL);
+ double elapsed;
- if (rc < 0) {
- flog(stdout, "lumberjack_ssl_write failed while sending payload: %s",
- strerror(errno));
- lumberjack_disconnect(lumberjack);
- return -1;
- }
+ struct backoff sleeper;
+ backoff_init(&sleeper, &MIN_SLEEP, &MAX_SLEEP);
+
+ while (remaining > 0) {
+ time_t timeout = 30;
+ rc = lumberjack_poll(lumberjack, timeout, LUMBERJACK_POLL_WRITE);
+ if ((rc & LUMBERJACK_POLL_WRITE) == 0) {
+ /* socket was not writable after the given timeout. Fail it. */
+ flog(stdout, "Waited %d seconds for a writable socket. Giving up.", timeout);
+ lumberjack_disconnect(lumberjack);
+ return -1;
+ }
+
+ bytes = SSL_write(lumberjack->ssl,
+ str_data(lumberjack->compression_buffer) + offset,
+ remaining);
+ if (bytes < 0) {
+ elapsed = duration(&start);
+ if (elapsed > 30) {
+ flog(stdout, "SSL_write took too long (%.3f seconds), assuming " \
+ "dead/busy server and disconnecting.", elapsed);
+ lumberjack_disconnect(lumberjack);
+ return -1;
+ }
+
+ rc = SSL_get_error(lumberjack->ssl, bytes);
+ if (rc == SSL_ERROR_WANT_READ) {
+ /* TODO(sissel): instead of backing off, select for read, then retry
+ * the write. */
+ backoff(&sleeper);
+ } else if (rc == SSL_ERROR_WANT_WRITE) {
+ /* TODO(sissel): instead of backing off, select for write, then retry
+ * the write. */
+ backoff(&sleeper);
+ }
+ }
+
+ remaining -= bytes;
+ offset += bytes;
+ }
return 0;
} /* lumberjack_flush */
@@ -464,15 +505,40 @@ static int lumberjack_read_ack(struct lumberjack *lumberjack,
* but the idea is that you can do bulk acks, so data-to-ack ratio should be
* high */
char buf[6];
+ ssize_t bytes;
+ size_t remaining = 6; /* version + frame type + 32bit sequence value */
+ size_t offset = 0;
+
int rc;
- size_t length = 6; /* version + frame type + 32bit sequence value */
- double timeout = 10.0; // seconds
- flog_if_slow(stdout, 1.0, {
- rc = lumberjack_ssl_read(lumberjack, buf, length, timeout);
- }, "SSL_read (tried to read %d bytes)", length);
+ while (remaining > 0) {
+ /* Allow a few seconds for a read timeout. If it occurs, fail this read.
+ * This timeout should cause a disconnect and reconnect to a new server.
+ * The idea is to prevent one receiving server from becoming overloaded. */
+ time_t timeout = 30;
+ rc = lumberjack_poll(lumberjack, timeout, LUMBERJACK_POLL_READ);
+ if ((rc & LUMBERJACK_POLL_READ) == 0) {
+ /* socket was not writable after the given timeout. Fail it. */
+ flog(stdout, "Waited %d seconds for a readable socket. Giving up.", timeout);
+ errno = ETIMEDOUT;
+ return -1;
+ }
- if (rc < 0) {
- return -1; /* timeout or other error condition */
+ flog_if_slow(stdout, 1.0, {
+ bytes = SSL_read(lumberjack->ssl, buf + offset, remaining);
+ }, "SSL_read (tried to read %d bytes)", remaining);
+ if (bytes == 0) {
+ /* EOF or some other similar error */
+ errno = EPIPE; /* close enough to fake EOF? */
+ return -1;
+ } else if (bytes < 0) {
+ rc = SSL_get_error(lumberjack->ssl, bytes /* error code */);
+ flog(stdout, "SSL_read error vv");
+ ERR_print_errors_fp(stdout);
+ flog(stdout, "SSL_read error ^^");
+ return -1;
+ }
+ offset += bytes;
+ remaining -= bytes;
}
if ((buf[0] != LUMBERJACK_VERSION_1) || (buf[1] != LUMBERJACK_ACK_FRAME)) {

0 comments on commit 9301267

Please sign in to comment.