Skip to content

Commit

Permalink
Yet another rework and bug fixes, but right now I have wss clients wo…
Browse files Browse the repository at this point in the history
…rking as it seems!
  • Loading branch information
babelouest committed Oct 8, 2018
1 parent ab7d7b2 commit 1887a79
Showing 1 changed file with 32 additions and 52 deletions.
84 changes: 32 additions & 52 deletions src/u_websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@
#include <netdb.h>
#include <stdlib.h>

#define STR_HELPER(x) #x
#define STR(x) STR_HELPER(x)

/**********************************/
/** Internal websocket functions **/
/**********************************/

static int is_websocket_data_available(struct _websocket_manager * websocket_manager, int block) {
static int is_websocket_data_available(struct _websocket_manager * websocket_manager) {
int ret = 0, poll_ret = 0;

do {
Expand All @@ -56,18 +59,20 @@ static int is_websocket_data_available(struct _websocket_manager * websocket_man
} else if (poll_ret > 0) {
ret = 1;
}
} while (block && ret == -1);
} while (ret == -1);
return ret;
}

static size_t read_data_from_handshake_response(struct _websocket_manager * websocket_manager, uint8_t * data, size_t len) {
static size_t read_data_from_socket(struct _websocket_manager * websocket_manager, uint8_t * data, size_t len) {
size_t ret = 0;
ssize_t data_len;

if (len > 0) {
do {
if (websocket_manager->tls) {
data_len = gnutls_record_recv(websocket_manager->gnutls_session, data, (len - ret));
} else if (websocket_manager->type == U_WEBSOCKET_SERVER) {
data_len = read(websocket_manager->mhd_sock, data, (len - ret));
} else {
data_len = read(websocket_manager->tcp_sock, data, (len - ret));
}
Expand All @@ -82,37 +87,6 @@ static size_t read_data_from_handshake_response(struct _websocket_manager * webs
return ret;
}

static size_t read_data_from_socket(struct _websocket_manager * websocket_manager, uint8_t * data, size_t len) {
size_t ret = 0;
ssize_t data_len;
int data_available;

if (len > 0) {
do {
if ((data_available = is_websocket_data_available(websocket_manager, 1))) {
if (websocket_manager->type == U_WEBSOCKET_SERVER) {
data_len = read(websocket_manager->mhd_sock, data, (len - ret));
} else {
if (websocket_manager->tls) {
data_len = gnutls_record_recv(websocket_manager->gnutls_session, data, (len - ret));
} else {
data_len = read(websocket_manager->tcp_sock, data, (len - ret));
}
}
if (data_len > 0) {
ret += data_len;
} else if (data_len < 0) {
ret = -1;
data_available = 0;
}
} else {
ret = -1;
}
} while (data_available && ret < len);
}
return ret;
}

/**
* Workaround to make sure a message, as long as it can be is complete sent
*/
Expand Down Expand Up @@ -318,7 +292,7 @@ static int ulfius_send_websocket_message_managed(struct _websocket_manager * web
pthread_mutex_unlock(&websocket_manager->write_lock);
}
} else {
y_log_message(Y_LOG_LEVEL_DEBUG, "Ulfius - Error ulfius_send_websocket_message_managed params");
y_log_message(Y_LOG_LEVEL_ERROR, "Ulfius - Error ulfius_send_websocket_message_managed params");
ret = U_ERROR_PARAMS;
}
return ret;
Expand Down Expand Up @@ -430,6 +404,9 @@ static int ulfius_read_incoming_message(struct _websocket_manager * websocket_ma
}
o_free(payload_data);
}
if (!fin) {
while (!is_websocket_data_available(websocket_manager));
}
}
} while (ret == U_OK && !fin);
} else {
Expand All @@ -450,11 +427,10 @@ void * ulfius_thread_websocket_manager_run(void * args) {
websocket->websocket_manager_callback(websocket->request, websocket->websocket_manager, websocket->websocket_manager_user_data);

// Send close message if the websocket is still open
if (websocket->websocket_manager->connected && ulfius_websocket_send_message(websocket->websocket_manager, U_WEBSOCKET_OPCODE_CLOSE, 0, NULL) != U_OK) {
y_log_message(Y_LOG_LEVEL_ERROR, "Ulfius - Error sending close message after closing callback manager");
if (websocket->websocket_manager->connected) {
websocket->websocket_manager->close_flag = 1;
}
}

return NULL;
}

Expand Down Expand Up @@ -486,7 +462,7 @@ void * ulfius_thread_websocket(void * data) {
}
websocket->websocket_manager->connected = 0;
} else {
if (is_websocket_data_available(websocket->websocket_manager, 0)) {
if (is_websocket_data_available(websocket->websocket_manager)) {
if (pthread_mutex_lock(&websocket->websocket_manager->read_lock)) {
y_log_message(Y_LOG_LEVEL_ERROR, "Ulfius - Error locking websocket read lock messages");
websocket->websocket_manager->connected = 0;
Expand Down Expand Up @@ -544,7 +520,6 @@ void * ulfius_thread_websocket(void * data) {
} else {
y_log_message(Y_LOG_LEVEL_ERROR, "Ulfius - Error websocket parameters");
}

pthread_exit(NULL);
}

Expand All @@ -560,7 +535,7 @@ static int ulfius_get_next_line_from_http_response(struct _websocket * websocket

*line_len = 0;
do {
if (read_data_from_handshake_response(websocket->websocket_manager, &car, 1) == 1) {
if (read_data_from_socket(websocket->websocket_manager, &car, 1) == 1) {
buffer[offset] = car;
}

Expand Down Expand Up @@ -681,8 +656,8 @@ static int ulfius_websocket_connection_handshake(struct _u_request * request, st
response->binary_body_length = strtol(u_map_get(response->map_header, "Content-Length"), NULL, 10);
response->binary_body = o_malloc(response->binary_body_length);
if (response->binary_body != NULL) {
if (read_data_from_handshake_response(websocket->websocket_manager, response->binary_body, response->binary_body_length) != response->binary_body_length) {
y_log_message(Y_LOG_LEVEL_ERROR, "Ulfius - Error read_data_from_handshake_response for response->binary_body");
if (read_data_from_socket(websocket->websocket_manager, response->binary_body, response->binary_body_length) != response->binary_body_length) {
y_log_message(Y_LOG_LEVEL_ERROR, "Ulfius - Error read_data_from_socket for response->binary_body");
}
} else {
y_log_message(Y_LOG_LEVEL_ERROR, "Ulfius - Error allocating resources for response->binary_body");
Expand Down Expand Up @@ -1127,16 +1102,21 @@ int ulfius_websocket_send_fragmented_message(struct _websocket_manager * websock
if (ulfius_send_websocket_message_managed(websocket_manager, U_WEBSOCKET_OPCODE_CLOSE, 0, NULL, 0) == U_OK) {
// If message sent is U_WEBSOCKET_OPCODE_CLOSE, wait for the close response for WEBSOCKET_MAX_CLOSE_TRY messages max, then close the connection
do {
message = NULL;
ret_message = ulfius_read_incoming_message(websocket_manager, &message);
if (ret_message == U_OK && message != NULL) {
if (ulfius_push_websocket_message(websocket_manager->message_list_incoming, message) != U_OK) {
y_log_message(Y_LOG_LEVEL_ERROR, "Error pushing new websocket message in list");
if (is_websocket_data_available(websocket_manager)) {
message = NULL;
ret_message = ulfius_read_incoming_message(websocket_manager, &message);
if (ret_message == U_OK && message != NULL) {
if (message->opcode == U_WEBSOCKET_OPCODE_CLOSE) {
websocket_manager->connected = 0;
}
if (ulfius_push_websocket_message(websocket_manager->message_list_incoming, message) != U_OK) {
y_log_message(Y_LOG_LEVEL_ERROR, "Error pushing new websocket message in list");
}
} else {
websocket_manager->connected = 0;
}
} else {
websocket_manager->connected = 0;
}
} while (websocket_manager->connected && message->opcode != U_WEBSOCKET_OPCODE_CLOSE && (count-- > 0));
} while (websocket_manager->connected && (count-- > 0));
} else {
y_log_message(Y_LOG_LEVEL_ERROR, "Ulfius - Error sending U_WEBSOCKET_OPCODE_CLOSE message");
}
Expand Down Expand Up @@ -1492,7 +1472,7 @@ int ulfius_set_websocket_request(struct _u_request * request,
u_map_put(request->map_header, "Upgrade", "websocket");
u_map_put(request->map_header, "Connection", "Upgrade");
u_map_put(request->map_header, "Content-Length", "0");
u_map_put(request->map_header, "User-Agent", "Ulfius Websocket Client Framework");
u_map_put(request->map_header, "User-Agent", U_WEBSOCKET_USER_AGENT "/" STR(ULFIUS_VERSION));
srand(time(NULL));
sprintf(rand_int, "%04d%04d%04d%04d", rand(), rand(), rand(), rand());
if (!o_base64_encode((unsigned char *)rand_int, 16, (unsigned char *)rand_int_base64, &out_len)) {
Expand Down

0 comments on commit 1887a79

Please sign in to comment.