diff --git a/bin_interface.c b/bin_interface.c
index cb78f0efea0..4bfbfa5ef4c 100644
--- a/bin_interface.c
+++ b/bin_interface.c
@@ -32,12 +32,11 @@ struct socket_info *bin;
int bin_children = 1;
-static int child_index;
static char *send_buffer;
static char *cpos;
-static char rcv_buf[BUF_SIZE];
+static char *rcv_buf;
static char *rcv_end;
static struct packet_cb_list *reg_modules;
@@ -53,6 +52,13 @@ static struct packet_cb_list *reg_modules;
*
* @param: { LEN, MOD_NAME } + CMD
*/
+
+void set_len(char *send_buffer, char *cpos){
+ unsigned short len = cpos - send_buffer, *px;
+ px = (unsigned short *) (send_buffer + BIN_PACKET_MARKER_SIZE);
+ *px = len;
+}
+
int bin_init(str *mod_name, int cmd_type)
{
if (!send_buffer) {
@@ -75,6 +81,7 @@ int bin_init(str *mod_name, int cmd_type)
memcpy(cpos, &cmd_type, sizeof(cmd_type));
cpos += sizeof(cmd_type);
+ set_len(send_buffer, cpos);
return 0;
}
@@ -103,6 +110,7 @@ int bin_push_str(const str *info)
cpos += LEN_FIELD_SIZE;
memcpy(cpos, info->s, info->len);
cpos += info->len;
+ set_len(send_buffer, cpos);
return (int)LEN_FIELD_SIZE + info->len;
}
@@ -122,9 +130,22 @@ int bin_push_int(int info)
memcpy(cpos, &info, sizeof(info));
cpos += sizeof(info);
+ set_len(send_buffer, cpos);
+
return sizeof(info);
}
+int bin_get_buffer(str *buffer)
+{
+ if (!buffer)
+ return -1;
+
+ buffer->s = send_buffer;
+ buffer->len = bin_send_size;
+
+ return 1;
+}
+
/*
* skips @count integers from the current position in the received binary packet
*
@@ -137,11 +158,6 @@ int bin_skip_int(int count)
int i;
char *in = cpos;
- if (child_index == 0) {
- LM_ERR("Non bin processes cannot do pop operations!\n");
- return -2;
- }
-
for (i = 0; i < count; i++) {
if (cpos + LEN_FIELD_SIZE > rcv_end) {
LM_ERR("Receive binary packet buffer overflow");
@@ -166,10 +182,6 @@ int bin_skip_str(int count)
int i, len;
char *in = cpos;
- if (child_index == 0) {
- LM_ERR("Non bin processes cannot do pop operations!\n");
- return -2;
- }
for (i = 0; i < count; i++) {
if (cpos + LEN_FIELD_SIZE > rcv_end)
@@ -208,11 +220,6 @@ int bin_pop_str(str *info)
if (cpos == rcv_end)
return 1;
- if (child_index == 0) {
- LM_ERR("Non bin processes cannot do pop operations!\n");
- return -2;
- }
-
if (cpos + LEN_FIELD_SIZE > rcv_end)
goto error;
@@ -252,10 +259,6 @@ int bin_pop_int(void *info)
if (cpos == rcv_end)
return 1;
- if (child_index == 0) {
- LM_ERR("Non bin processes cannot do pop operations!\n");
- return -2;
- }
if (cpos + sizeof(int) > rcv_end) {
LM_ERR("Receive binary packet buffer overflow");
@@ -336,136 +339,41 @@ int bin_register_cb(char *mod_name, void (*cb)(int, struct receive_info *))
return 0;
}
-static int has_valid_checksum(char *buf, int len)
-{
- unsigned int crc, real_crc;
- str st;
-
- crc = *(unsigned int *)(buf + BIN_PACKET_MARKER_SIZE);
-
- st.s = buf + HEADER_SIZE;
- st.len = len - HEADER_SIZE;
-
- crc32_uint(&st, &real_crc);
-
- return crc == real_crc;
-}
/*
* main binary packet UDP receiver loop
*/
-static void bin_receive_loop(void)
-{
- struct receive_info ri;
- socklen_t addrlen;
- struct packet_cb_list *p;
- str name;
- int rcv_bytes;
-
- ri.bind_address = bind_address;
- ri.dst_port = bind_address->port_no;
- ri.dst_ip = bind_address->address;
- ri.proto = PROTO_UDP;
- ri.proto_reserved1 = ri.proto_reserved2 = 0;
-
- for (;;) {
- addrlen = sizeof ri.src_su;
- rcv_bytes = recvfrom(bind_address->socket, rcv_buf, BUF_SIZE,
- 0, &ri.src_su.s, &addrlen);
- if (rcv_bytes == -1) {
- if (errno == EAGAIN) {
- LM_DBG("packet with bad checksum received\n");
- continue;
- }
-
- LM_ERR("recvfrom: [%d] %s\n", errno, strerror(errno));
- if (errno == EINTR || errno == EWOULDBLOCK || errno == ECONNREFUSED)
- continue;
-
- return;
- }
-
- if (addrlen > sizeof ri.src_su)
- LM_CRIT("src addr truncated! (%u -> %zu)\n", addrlen, sizeof ri.src_su);
-
- rcv_end = rcv_buf + rcv_bytes;
-
- if (rcv_bytes < MIN_BIN_PACKET_SIZE) {
- LM_INFO("received invalid packet: len = %d\n", rcv_bytes);
- continue;
- }
- if (!is_valid_bin_packet(rcv_buf)) {
- LM_WARN("Invalid binary packet header! First 10 bytes: %.*s\n",
- 10, rcv_buf);
- continue;
- }
- if (!has_valid_checksum(rcv_buf, rcv_bytes)) {
- LM_WARN("binary packet checksum test failed!\n");
- continue;
- }
+void call_callbacks(char* buffer, struct receive_info *rcv){
+ str name;
+ struct packet_cb_list *p;
+ rcv_buf = buffer;
+ get_name(rcv_buf, name);
+ rcv_end = rcv_buf + *(unsigned short*)(buffer + BIN_PACKET_MARKER_SIZE);
- get_name(rcv_buf, name);
- cpos = name.s + name.len + CMD_FIELD_SIZE;
+ cpos = name.s + name.len + CMD_FIELD_SIZE;
- /* packet will be now processed by a specific module */
- for (p = reg_modules; p; p = p->next) {
- if (p->module.len == name.len &&
- memcmp(name.s, p->module.s, name.len) == 0) {
+ /* packet will be now processed by a specific module */
+ for (p = reg_modules; p; p = p->next) {
+ if (p->module.len == name.len &&
+ memcmp(name.s, p->module.s, name.len) == 0) {
- LM_DBG("binary Packet CMD: %d. Module: %.*s\n",
- bin_rcv_type, name.len, name.s);
+ LM_DBG("binary Packet CMD: %d. Module: %.*s\n",
+ bin_rcv_type, name.len, name.s);
- p->cbf(bin_rcv_type, &ri);
+ p->cbf(bin_rcv_type, rcv);
- break;
- }
+ break;
}
}
}
+
/*
* called in the OpenSIPS initialization phase by the main process.
* forks the binary packet UDP receivers.
*
* @return: 0 on success
*/
-int start_bin_receivers(void)
-{
- pid_t pid;
- int i;
-
- if (udp_init_listener(bin, 0) != 0)
- return -1;
-
- for (i = 1; i <= bin_children; i++) {
- if ((pid = internal_fork("BIN receiver")) < 0) {
- LM_CRIT("Cannot fork binary packet receiver process!\n");
- return -1;
- }
-
- if (pid == 0) {
- LM_DBG("CHILD sock: %d\n", bin->socket);
-
- child_index = i;
- set_proc_attrs("BIN receiver %.*s ",
- bin->sock_str.len,
- bin->sock_str.s);
- bind_address = bin;
-
- if (init_child(PROC_BIN) < 0) {
- LM_ERR("init_child failed for BIN listener\n");
- report_failure_status();
- exit(-1);
- }
-
- bin_receive_loop();
- exit(-1);
- } else
- LM_DBG("PARENT sock: %d\n", bin->socket);
- }
-
- return 0;
-}
diff --git a/bin_interface.h b/bin_interface.h
index cb21bb7effa..2fb4c933ddc 100644
--- a/bin_interface.h
+++ b/bin_interface.h
@@ -69,6 +69,13 @@ struct packet_cb_list {
};
+/**
+ calls all the registered functions
+
+ @buffer: buffer containing a complete bin message
+ @rcv: information about the sender of the message
+ */
+void call_callbacks(char* buffer, struct receive_info *rcv);
/*
* registers a callback function to be triggered on a received
* binary packet marked with the @mod_name module name
@@ -105,6 +112,9 @@ int bin_push_str(const str *info);
*/
int bin_push_int(int info);
+/* TODO - comment, lol */
+int bin_get_buffer(str *buffer);
+
/*
* pops a str structure from a received binary packet
* @info: pointer to store the result
diff --git a/cfg.y b/cfg.y
index bb25e900c0a..6b1fb275a9f 100644
--- a/cfg.y
+++ b/cfg.y
@@ -884,12 +884,13 @@ assign_stm: DEBUG EQUAL snumber {
"expected (use quotes if the hostname includes"
" config keywords)"); }
| BIN_LISTEN EQUAL listen_id COLON port {
+ // TODO - think it should be remoevd
if (bin) {
yyerror("can only define one binary packet interface");
YYABORT;
}
- lst_tmp = mk_listen_id($3, PROTO_UDP, $5);
+ lst_tmp = mk_listen_id($3, PROTO_BIN, $5);
bin = new_sock_info(lst_tmp->name,
lst_tmp->port,
lst_tmp->proto,
diff --git a/ip_addr.h b/ip_addr.h
index 143340d13c7..225ff89783d 100644
--- a/ip_addr.h
+++ b/ip_addr.h
@@ -50,7 +50,7 @@
#define BUFFER_INCREMENT 2048
enum sip_protos { PROTO_NONE = 0, PROTO_FIRST = 1, PROTO_UDP = 1, \
- PROTO_TCP, PROTO_TLS, PROTO_SCTP, PROTO_WS, PROTO_OTHER };
+ PROTO_TCP, PROTO_TLS, PROTO_SCTP, PROTO_WS, PROTO_BIN, PROTO_OTHER };
#define PROTO_LAST PROTO_OTHER
struct ip_addr{
diff --git a/main.c b/main.c
index f9d7bee440c..5a3f2cd9f14 100644
--- a/main.c
+++ b/main.c
@@ -701,17 +701,6 @@ static int main_loop(void)
*startup_done = 0;
}
- if (fix_socket_list(&bin) != 0) {
- LM_ERR("failed to initialize binary interface socket list!\n");
- goto error;
- }
-
- /* OpenSIPS <--> OpenSIPS communication interface */
- if (bin && start_bin_receivers() != 0) {
- LM_CRIT("cannot start binary interface receiver processes!\n");
- goto error;
- }
-
/* fork for the timer process*/
if (start_timer_processes()!=0) {
LM_CRIT("cannot start timer process(es)\n");
diff --git a/modules/proto_bin/Makefile b/modules/proto_bin/Makefile
new file mode 100644
index 00000000000..1527f18a966
--- /dev/null
+++ b/modules/proto_bin/Makefile
@@ -0,0 +1,6 @@
+include ../../Makefile.defs
+auto_gen=
+NAME=proto_bin.so
+LIBS=
+
+include ../../Makefile.modules
\ No newline at end of file
diff --git a/modules/proto_bin/README b/modules/proto_bin/README
new file mode 100644
index 00000000000..d24144eabc4
--- /dev/null
+++ b/modules/proto_bin/README
@@ -0,0 +1,170 @@
+proto_bin Module
+
+Ionel Cerghit
+
+ OpenSIPS Solutions
+
+ Copyright © 2015 OpenSIPS Project
+ __________________________________________________________
+
+ Table of Contents
+
+ 1. Admin Guide
+
+ 1.1. Overview
+ 1.2. Dependencies
+
+ 1.2.1. OpenSIPS Modules
+ 1.2.2. External Libraries or Applications
+
+ 1.3. Exported Parameters
+
+ 1.3.1. bin_port (integer)
+ 1.3.2. bin_port_send_timeout (integer)
+ 1.3.3. bin_max_msg_chunks (integer)
+ 1.3.4. bin_async (integer)
+ 1.3.5. bin_async_max_postponed_chunks (integer)
+ 1.3.6. bin_async_local_connect_timeout (integer)
+ 1.3.7. bin_async_local_write_timeout (integer)
+
+ List of Examples
+
+ 1.1. Set bin_port parameter
+ 1.2. Set bin_send_timeout parameter
+ 1.3. Set bin_max_msg_chunks parameter
+ 1.4. Set bin_async parameter
+ 1.5. Set bin_async_max_postponed_chunks parameter
+ 1.6. Set bin_async_local_connect_timeout parameter
+ 1.7. Set bin_async_local_write_timeout parameter
+
+Chapter 1. Admin Guide
+
+1.1. Overview
+
+ The proto_bin module is a transport module which implements BIN
+ TCP-based communication. It does not handle TCP connections
+ management, but only offers higher-level primitives to read and
+ write BIN messages over TCP. It calls registered callback
+ functions for every complete message received.
+
+ Once loaded, you will be able to define BIN listeners in your
+ script, by adding its IP, and optionally the listening port, in
+ your configuration file, similar to this example:
+
+...
+listen=bin:127.0.0.1 # change the listening IP
+listen=bin:127.0.0.1:5080 # change with the listening IP and port
+...
+
+1.2. Dependencies
+
+1.2.1. OpenSIPS Modules
+
+ The following modules must be loaded before this module:
+ * None.
+
+1.2.2. External Libraries or Applications
+
+ The following libraries or applications must be installed
+ before running OpenSIPS with this module loaded:
+ * None.
+
+1.3. Exported Parameters
+
+1.3.1. bin_port (integer)
+
+ The default port to be used by all TCP listeners.
+
+ Default value is 5555.
+
+ Example 1.1. Set bin_port parameter
+...
+modparam("proto_bin", "bin_port", 6666)
+...
+
+1.3.2. bin_port_send_timeout (integer)
+
+ Time in milliseconds after a TCP connection will be closed if
+ it is not available for blocking writing in this interval (and
+ OpenSIPS wants to send something on it).
+
+ Default value is 100 ms.
+
+ Example 1.2. Set bin_send_timeout parameter
+...
+modparam("proto_bin", "bin_send_timeout", 200)
+...
+
+1.3.3. bin_max_msg_chunks (integer)
+
+ The maximum number of chunks that a BIN message is expected to
+ arrive via TCP. If a packet is received more fragmented than
+ this, the connection is dropped (either the connection is very
+ overloaded and this leads to high fragmentation - or we are the
+ victim of an ongoing attack where the attacker is sending the
+ traffic very fragmented in order to decrease our performance).
+
+ Default value is 32.
+
+ Example 1.3. Set bin_max_msg_chunks parameter
+...
+modparam("proto_bin", "bin_max_msg_chunks", 8)
+...
+
+1.3.4. bin_async (integer)
+
+ If the TCP connect and write operations should be done in an
+ asynchronous mode (non-blocking connect and write). If
+ disabled, OpenSIPS will block and wait for TCP operations like
+ connect and write.
+
+ Default value is 1 (enabled).
+
+ Example 1.4. Set bin_async parameter
+...
+modparam("proto_bin", "bin_async", 0)
+...
+
+1.3.5. bin_async_max_postponed_chunks (integer)
+
+ If bin_async is enabled, this specifies the maximum number of
+ BIN messages that can be stashed for later/async writing. If
+ the connection pending writes exceed this number, the
+ connection will be marked as broken and dropped.
+
+ Default value is 32.
+
+ Example 1.5. Set bin_async_max_postponed_chunks parameter
+...
+modparam("proto_bin", "bin_async_max_postponed_chunks", 16)
+...
+
+1.3.6. bin_async_local_connect_timeout (integer)
+
+ If bin_async is enabled, this specifies the number of
+ milliseconds that a connect will be tried in blocking mode
+ (optimization). If the connect operation lasts more than this,
+ the connect will go to async mode and will be passed to TCP
+ MAIN for polling.
+
+ Default value is 100 ms.
+
+ Example 1.6. Set bin_async_local_connect_timeout parameter
+...
+modparam("proto_bin", "bin_async_local_connect_timeout", 200)
+...
+
+1.3.7. bin_async_local_write_timeout (integer)
+
+ If bin_async is enabled, this specifies the number of
+ milliseconds that a write op will be tried in blocking mode
+ (optimization). If the write operation lasts more than this,
+ the write will go to async mode and will be passed to bin MAIN
+ for polling.
+
+ Default value is 10 ms.
+
+ Example 1.7. Set bin_async_local_write_timeout parameter
+...
+modparam("proto_bin", "tcp_async_local_write_timeout", 100)
+...
diff --git a/modules/proto_bin/doc/proto_bin.xml b/modules/proto_bin/doc/proto_bin.xml
new file mode 100644
index 00000000000..5b6cc9dd3b4
--- /dev/null
+++ b/modules/proto_bin/doc/proto_bin.xml
@@ -0,0 +1,40 @@
+
+
+
+
+
+%docentities;
+
+]>
+
+
+
+ proto_bin Module
+ &osipsname;
+
+
+ Ionel
+ Cerghit
+ OpenSIPS Solutions
+
+ ionel.cerghit@gmail.com
+
+ http://www.opensips.org
+
+
+
+
+
+ 2015
+ &osips; Project
+
+
+
+
+ &admin;
+
+
diff --git a/modules/proto_bin/doc/proto_bin_admin.xml b/modules/proto_bin/doc/proto_bin_admin.xml
new file mode 100644
index 00000000000..058d6d762c5
--- /dev/null
+++ b/modules/proto_bin/doc/proto_bin_admin.xml
@@ -0,0 +1,222 @@
+
+
+
+
+ &adminguide;
+
+
+ Overview
+
+ The proto_bin module is a
+ transport module which implements BIN TCP-based communication. It does
+ not handle TCP connections management, but only offers higher-level
+ primitives to read and write BIN messages over TCP. It calls registered
+ callback functions for every complete message received.
+
+
+
+ Once loaded, you will be able to define BIN listeners in your script,
+ by adding its IP, and optionally the listening port, in your configuration
+ file, similar to this example:
+
+
+...
+listen=bin:127.0.0.1 # change the listening IP
+listen=bin:127.0.0.1:5080 # change with the listening IP and port
+...
+
+
+
+
+
+ Dependencies
+
+ &osips; Modules
+
+ The following modules must be loaded before this module:
+
+
+
+ None.
+
+
+
+
+
+
+
+ External Libraries or Applications
+
+ The following libraries or applications must be installed before
+ running &osips; with this module loaded:
+
+
+
+ None.
+
+
+
+
+
+
+
+
+ Exported Parameters
+
+ bin_port (integer)
+
+ The default port to be used by all TCP listeners.
+
+
+
+ Default value is 5555.
+
+
+
+ Set bin_port parameter
+
+...
+modparam("proto_bin", "bin_port", 6666)
+...
+
+
+
+
+
+ bin_port_send_timeout (integer)
+
+ Time in milliseconds after a TCP connection will be closed if it is
+ not available for blocking writing in this interval (and &osips; wants
+ to send something on it).
+
+
+
+ Default value is 100 ms.
+
+
+
+ Set bin_send_timeout parameter
+
+...
+modparam("proto_bin", "bin_send_timeout", 200)
+...
+
+
+
+
+ bin_max_msg_chunks (integer)
+
+ The maximum number of chunks that a BIN message is expected to
+ arrive via TCP. If a packet is received more fragmented than this,
+ the connection is dropped (either the connection is very
+ overloaded and this leads to high fragmentation - or we are the
+ victim of an ongoing attack where the attacker is sending the
+ traffic very fragmented in order to decrease our performance).
+
+
+
+ Default value is 32.
+
+
+
+ Set bin_max_msg_chunks parameter
+
+...
+modparam("proto_bin", "bin_max_msg_chunks", 8)
+...
+
+
+
+
+ bin_async (integer)
+
+ If the TCP connect and write operations should be done in an
+ asynchronous mode (non-blocking connect and
+ write). If disabled, OpenSIPS will block and wait for TCP
+ operations like connect and write.
+
+
+
+ Default value is 1 (enabled).
+
+
+
+ Set bin_async parameter
+
+...
+modparam("proto_bin", "bin_async", 0)
+...
+
+
+
+
+ bin_async_max_postponed_chunks (integer)
+
+ If bin_async is enabled, this specifies the
+ maximum number of BIN messages that can be stashed for later/async
+ writing. If the connection pending writes exceed this number, the
+ connection will be marked as broken and dropped.
+
+
+
+ Default value is 32.
+
+
+
+ Set bin_async_max_postponed_chunks parameter
+
+...
+modparam("proto_bin", "bin_async_max_postponed_chunks", 16)
+...
+
+
+
+
+ bin_async_local_connect_timeout (integer)
+
+ If bin_async is enabled, this specifies the
+ number of milliseconds that a connect will be tried in blocking
+ mode (optimization). If the connect operation lasts more than
+ this, the connect will go to async mode and will be passed to TCP
+ MAIN for polling.
+
+
+
+ Default value is 100 ms.
+
+
+
+ Set bin_async_local_connect_timeout parameter
+
+...
+modparam("proto_bin", "bin_async_local_connect_timeout", 200)
+...
+
+
+
+
+ bin_async_local_write_timeout (integer)
+
+ If bin_async is enabled, this specifies the
+ number of milliseconds that a write op will be tried in blocking
+ mode (optimization). If the write operation lasts more than this,
+ the write will go to async mode and will be passed to bin MAIN for
+ polling.
+
+
+
+ Default value is 10 ms.
+
+
+
+ Set bin_async_local_write_timeout parameter
+
+...
+modparam("proto_bin", "tcp_async_local_write_timeout", 100)
+...
+
+
+
+
+
+
diff --git a/modules/proto_bin/proto_bin.c b/modules/proto_bin/proto_bin.c
new file mode 100644
index 00000000000..be3a5138bd2
--- /dev/null
+++ b/modules/proto_bin/proto_bin.c
@@ -0,0 +1,910 @@
+/*
+ * Copyright (C) 2015 - OpenSIPS Foundation
+ * Copyright (C) 2001-2003 FhG Fokus
+ *
+ * This file is part of opensips, a free SIP server.
+ *
+ * opensips is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * opensips is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ *
+ * History:
+ * -------
+ * 2015-01-09 first version (razvanc)
+ */
+#include
+#include
+ #include
+#include
+
+#include "../../timer.h"
+#include "../../sr_module.h"
+#include "../../net/api_proto.h"
+#include "../../net/api_proto_net.h"
+#include "../../net/net_tcp.h"
+#include "../../socket_info.h"
+#include "../../tsend.h"
+#include "../../net/proto_tcp/tcp_common_defs.h"
+#include "../../pt.h"
+#include "../../bin_interface.h"
+#include "proto_bin.h"
+#include "../../ut.h"
+
+
+static int mod_init(void);
+static int proto_bin_init(struct proto_info *pi);
+static int proto_bin_init_listener(struct socket_info *si);
+static int proto_bin_send(struct socket_info* send_sock,
+ char* buf, unsigned int len, union sockaddr_union* to, int id);
+static int bin_read_req(struct tcp_connection* con, int* bytes_read);
+static int bin_write_async_req(struct tcp_connection* con,int fd);
+static int bin_conn_init(struct tcp_connection* c);
+static void bin_conn_clean(struct tcp_connection* c);
+
+
+static int bin_port = 5555;
+static int bin_send_timeout = 100;
+static struct tcp_req bin_current_req;
+static int bin_max_msg_chunks = 32;
+static int bin_async = 1;
+static int bin_async_max_postponed_chunks = 32;
+static int bin_async_local_connect_timeout = 100;
+static int bin_async_local_write_timeout = 10;
+
+struct bin_send_chunk {
+ char *buf; /* buffer that needs to be sent out */
+ char *pos; /* the position that we should be writing next */
+ int len; /* length of the buffer */
+ int ticks; /* time at which this chunk was initially
+ attempted to be written */
+};
+
+struct bin_data {
+ /* the chunks that need to be written on this
+ * connection when it will become writable */
+ struct bin_send_chunk **async_chunks;
+ /* the total number of chunks pending to be written */
+ int async_chunks_no;
+ /* the oldest chunk in our write list */
+ int oldest_chunk;
+};
+
+static cmd_export_t cmds[] = {
+ {"proto_init", (cmd_function)proto_bin_init, 0, 0, 0, 0},
+ {0,0,0,0,0,0}
+};
+
+static param_export_t params[] = {
+ { "bin_port", INT_PARAM, &bin_port },
+ { "bin_send_timeout", INT_PARAM, &bin_send_timeout },
+ { "bin_max_msg_chunks", INT_PARAM, &bin_max_msg_chunks },
+ { "bin_async", INT_PARAM, &bin_async },
+ { "bin_async_max_postponed_chunks", INT_PARAM,
+ &bin_async_max_postponed_chunks },
+ { "bin_async_local_connect_timeout", INT_PARAM,
+ &bin_async_local_connect_timeout},
+ { "bin_async_local_write_timeout", INT_PARAM,
+ &bin_async_local_write_timeout },
+ {0, 0, 0}
+};
+
+struct module_exports exports = {
+ PROTO_PREFIX "bin", /* module name*/
+ MOD_TYPE_DEFAULT,/* class of this module */
+ MODULE_VERSION,
+ DEFAULT_DLFLAGS, /* dlopen flags */
+ NULL, /* OpenSIPS module dependencies */
+ cmds, /* exported functions */
+ 0, /* exported async functions */
+ params, /* module parameters */
+ 0, /* exported statistics */
+ 0, /* exported MI functions */
+ 0, /* exported pseudo-variables */
+ 0, /* extra processes */
+ mod_init, /* module initialization function */
+ 0, /* response function */
+ 0, /* destroy function */
+ 0, /* per-child init function */
+};
+
+static int proto_bin_init(struct proto_info *pi) {
+ pi->default_port = bin_port;
+
+ pi->tran.init_listener = proto_bin_init_listener;
+ pi->tran.send = proto_bin_send;
+ pi->tran.dst_attr = tcp_conn_fcntl;
+
+ pi->net.flags = PROTO_NET_USE_TCP;
+ pi->net.read = (proto_net_read_f)bin_read_req;
+ pi->net.write = (proto_net_write_f)bin_write_async_req;
+
+ if (bin_async != 0) {
+ pi->net.conn_init = bin_conn_init;
+ pi->net.conn_clean = bin_conn_clean;
+ }
+
+ return 0;
+}
+
+
+static int mod_init(void)
+{
+ LM_INFO("initializing BIN protocol\n");
+
+ return 0;
+}
+
+
+static int bin_conn_init(struct tcp_connection* c)
+{
+ struct bin_data *d;
+
+ /* allocate the tcp_data and the array of chunks as a single mem chunk */
+ d = (struct bin_data*)shm_malloc( sizeof(struct bin_data) +
+ sizeof(struct bin_send_chunk *) * bin_async_max_postponed_chunks );
+ if (d == NULL) {
+ LM_ERR("failed to create tcp chunks in shm mem\n");
+ return -1;
+ }
+
+ d->async_chunks = (struct bin_send_chunk **)(d+1);
+ d->async_chunks_no = 0;
+ d->oldest_chunk = 0;
+
+ c->proto_data = (void*)d;
+ return 0;
+}
+
+static void bin_conn_clean(struct tcp_connection* c)
+{
+ struct bin_data *d = (struct bin_data*)c->proto_data;
+ int r;
+
+ for (r = 0; r < d->async_chunks_no; r++) {
+ shm_free(d->async_chunks[r]);
+ }
+
+ shm_free(d);
+
+ c->proto_data = NULL;
+}
+
+static int proto_bin_init_listener(struct socket_info *si)
+{
+ /* we do not do anything particular, so
+ * transparently use the generic listener init from net TCP layer */
+ return tcp_init_listener(si);
+}
+
+
+
+
+
+static int add_write_chunk(struct tcp_connection *con,char *buf,int len,
+ int lock)
+{
+ struct bin_send_chunk *c;
+ struct bin_data *d = (struct bin_data*)con->proto_data;
+
+ c = shm_malloc(sizeof(struct bin_send_chunk) + len);
+ if (!c) {
+ LM_ERR("No more SHM\n");
+ return -1;
+ }
+
+ c->len = len;
+ c->ticks = get_ticks();
+ c->buf = (char *)(c+1);
+ memcpy(c->buf,buf,len);
+ c->pos = c->buf;
+
+ if (lock)
+ lock_get(&con->write_lock);
+
+ if (d->async_chunks_no == bin_async_max_postponed_chunks) {
+ LM_ERR("We have reached the limit of max async postponed chunks\n");
+ if (lock)
+ lock_release(&con->write_lock);
+ shm_free(c);
+ return -2;
+ }
+
+ d->async_chunks[d->async_chunks_no++] = c;
+ if (d->async_chunks_no == 1)
+ d->oldest_chunk = c->ticks;
+
+ if (lock)
+ lock_release(&con->write_lock);
+
+ return 0;
+}
+
+static int async_tsend_stream(struct tcp_connection *c,
+ int fd, char* buf, unsigned int len, int timeout)
+{
+ int written;
+ int n;
+ struct pollfd pf;
+
+ pf.fd=fd;
+ pf.events=POLLOUT;
+ written=0;
+
+again:
+ goto async_write;
+ n=send(fd, buf, len,0);
+
+ if (n<0){
+ if (errno==EINTR) goto again;
+ else if (errno!=EAGAIN && errno!=EWOULDBLOCK) {
+ LM_ERR("Failed first TCP async send : (%d) %s\n",
+ errno, strerror(errno));
+ return -1;
+ } else
+ goto poll_loop;
+ }
+
+ written+=n;
+ if (n < len) {
+ /* partial write */
+ buf += n;
+ len -= n;
+ } else {
+ /* succesful write from the first try */
+ LM_DBG("Async succesful write from first try on %p\n",c);
+ return len;
+ }
+
+poll_loop:
+ n = poll(&pf,1,timeout);
+ if (n<0) {
+ if (errno==EINTR)
+ goto poll_loop;
+ LM_ERR("Polling while trying to async send failed %s [%d]\n",
+ strerror(errno), errno);
+ return -1;
+ } else if (n == 0) {
+ async_write:
+
+ LM_DBG("timeout -> do an async write (add it to conn)\n");
+ /* timeout - let's just pass to main */
+ if (add_write_chunk(c,buf,len,0) < 0) {
+ LM_ERR("Failed to add write chunk to connection \n");
+ return -1;
+ } else {
+ /* we have succesfully added async write chunk
+ * tell MAIN to poll out for us */
+ LM_DBG("Data still pending for write on conn %p\n",c);
+ return 0;
+ }
+ }
+
+ if (pf.events&POLLOUT)
+ goto again;
+
+ /* some other events triggered by poll - treat as errors */
+ return -1;
+}
+
+static struct tcp_connection* bin_sync_connect(struct socket_info* send_sock,
+ union sockaddr_union* server, int *fd)
+{
+ int s;
+ union sockaddr_union my_name;
+ socklen_t my_name_len;
+ struct tcp_connection* con;
+
+ s=socket(AF2PF(server->s.sa_family), SOCK_STREAM, 0);
+ if (s==-1){
+ LM_ERR("socket: (%d) %s\n", errno, strerror(errno));
+ goto error;
+ }
+ if (tcp_init_sock_opt(s)<0){
+ LM_ERR("tcp_init_sock_opt failed\n");
+ goto error;
+ }
+ my_name_len = sockaddru_len(send_sock->su);
+ memcpy( &my_name, &send_sock->su, my_name_len);
+ su_setport( &my_name, 0);
+ if (bind(s, &my_name.s, my_name_len )!=0) {
+ LM_ERR("bind failed (%d) %s\n", errno,strerror(errno));
+ goto error;
+ }
+
+ if (tcp_connect_blocking(s, &server->s, sockaddru_len(*server))<0){
+ LM_ERR("tcp_blocking_connect failed\n");
+ goto error;
+ }
+ con = tcp_conn_create(s, server, send_sock, S_CONN_OK);
+ if (con==NULL){
+ LM_ERR("tcp_conn_create failed, closing the socket\n");
+ goto error;
+ }
+ *fd = s;
+ return con;
+ /*FIXME: set sock idx! */
+error:
+ /* close the opened socket */
+ if (s!=-1) close(s);
+ return 0;
+}
+
+static int tcpconn_async_connect(struct socket_info* send_sock,
+ union sockaddr_union* server, char *buf, unsigned len,
+ struct tcp_connection** c, int *ret_fd)
+{
+ int fd, n;
+ union sockaddr_union my_name;
+ socklen_t my_name_len;
+ struct tcp_connection* con;
+
+ struct pollfd pf;
+
+ unsigned int elapsed,to;
+ int err;
+ unsigned int err_len;
+ int poll_err;
+ char *ip;
+ unsigned short port;
+ struct timeval begin;
+
+ /* create the socket */
+ fd = socket(AF2PF(server->s.sa_family), SOCK_STREAM, 0);
+ if (fd == -1){
+ LM_ERR("socket: (%d) %s\n", errno, strerror(errno));
+ return -1;
+ }
+ if (tcp_init_sock_opt(fd)<0){
+ LM_ERR("tcp_init_sock_opt failed\n");
+ goto error;
+ }
+ my_name_len = sockaddru_len(send_sock->su);
+ memcpy( &my_name, &send_sock->su, my_name_len);
+ su_setport( &my_name, 0);
+ if (bind(fd, &my_name.s, my_name_len )!=0) {
+ LM_ERR("bind failed (%d) %s\n", errno,strerror(errno));
+ goto error;
+ }
+
+ /* attempt to do connect and see if we do block or not */
+ poll_err = 0;
+ elapsed = 0;
+ to = bin_async_local_connect_timeout*1000;
+
+ if (gettimeofday(&(begin), NULL)) {
+ LM_ERR("Failed to get TCP connect start time\n");
+ goto error;
+ }
+
+again:
+ n = connect(fd, &server->s, sockaddru_len(*server));
+ goto async_connect;
+ if (n == -1) {
+ if (errno == EINTR){
+ elapsed=get_time_diff(&begin);
+ if (elapsed < to) goto again;
+ else {
+ LM_DBG("Local connect attempt failed \n");
+ goto async_connect;
+ }
+ }
+ if (errno != EINPROGRESS && errno!=EALREADY) {
+ get_su_info(&server->s, ip, port);
+ LM_ERR("[server=%s:%d] (%d) %s\n",ip, port, errno,strerror(errno));
+ goto error;
+ }
+ } else goto local_connect;
+
+ /* let's poll for a little */
+
+ pf.fd = fd;
+ pf.events = POLLOUT;
+
+ while(1){
+ elapsed = get_time_diff(&begin);
+ if (elapsed < to)
+ to -= elapsed;
+ else {
+ LM_DBG("Polling is overdue \n");
+ goto async_connect;
+ }
+
+ n = poll(&pf, 1, to/1000);
+
+ if (n < 0){
+ if (errno == EINTR) continue;
+ get_su_info(&server->s, ip, port);
+ LM_ERR("poll/select failed:[server=%s:%d] (%d) %s\n",
+ ip, port, errno, strerror(errno));
+ goto error;
+ } else if (n==0) /* timeout */ continue;
+
+ if (pf.revents & (POLLERR|POLLHUP|POLLNVAL)){
+ LM_ERR("poll error: flags %x\n", pf.revents);
+ poll_err=1;
+ }
+
+
+ err_len=sizeof(err);
+ getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &err_len);
+ if ((err==0) && (poll_err==0)) goto local_connect;
+ if (err!=EINPROGRESS && err!=EALREADY){
+ get_su_info(&server->s, ip, port);
+ LM_ERR("failed to retrieve SO_ERROR [server=%s:%d] (%d) %s\n",
+ ip, port, err, strerror(err));
+ goto error;
+ }
+ }
+
+async_connect:
+ LM_DBG("Create connection for async connect\n");
+ /* create a new dummy connection */
+ con = tcp_conn_create(fd, server, send_sock, S_CONN_CONNECTING);
+ if (con==NULL) {
+ LM_ERR("tcp_conn_create failed\n");
+ goto error;
+ }
+ /* attach the write buffer to it */
+ lock_get(&con->write_lock);
+ if (add_write_chunk(con,buf,len,0) < 0) {
+ LM_ERR("Failed to add the initial write chunk\n");
+ /* FIXME - seems no more SHM now ...
+ * continue the async connect process ? */
+ }
+
+ lock_release(&con->write_lock);
+ /* report an async, in progress connect */
+ *c = con;
+ return 0;
+
+local_connect:
+ con = tcp_conn_create(fd, server, send_sock, S_CONN_OK);
+ if (con==NULL) {
+ LM_ERR("tcp_conn_create failed, closing the socket\n");
+ goto error;
+ }
+ *c = con;
+ *ret_fd = fd;
+ /* report a local connect */
+ return 1;
+
+error:
+ close(fd);
+ *c = NULL;
+ return -1;
+}
+
+inline static int _bin_write_on_socket(struct tcp_connection *c, int fd,
+ char *buf, int len){
+ int n;
+
+ lock_get(&c->write_lock);
+ if (bin_async) {
+ n=async_tsend_stream(c,fd,buf,len, bin_async_local_write_timeout);
+ } else {
+ n = tsend_stream(fd, buf, len, bin_send_timeout);
+ }
+ lock_release(&c->write_lock);
+
+ return n;
+}
+
+static int proto_bin_send(struct socket_info* send_sock,
+ char* buf, unsigned int len, union sockaddr_union* to, int id)
+{
+ struct tcp_connection *c;
+ struct ip_addr ip;
+ int port;
+ int fd, n;
+
+ port=0;
+
+ if (to){
+ su2ip_addr(&ip, to);
+ port=su_getport(to);
+ n = tcp_conn_get(id, &ip, port, &c, &fd);
+ }else if (id){
+ n = tcp_conn_get(id, 0, 0, &c, &fd);
+ }else{
+ LM_CRIT("tcp_send called with null id & to\n");
+ return -1;
+ }
+
+ if (n<0) {
+ /* error during conn get, return with error too */
+ LM_ERR("failed to aquire connection\n");
+ return -1;
+ }
+
+ /* was connection found ?? */
+ if (c==0) {
+ if (tcp_no_new_conn) {
+ return -1;
+ }
+ LM_DBG("no open tcp connection found, opening new one, async = %d\n",bin_async);
+ /* create tcp connection */
+ if (bin_async) {
+ n = tcpconn_async_connect(send_sock, to, buf, len, &c, &fd);
+ if ( n<0 ) {
+ LM_ERR("async TCP connect failed\n");
+ return -1;
+ }
+ /* connect succeded, we have a connection */
+ if (n==0) {
+ /* connect is still in progress, break the sending
+ * flow now (the actual write will be done when
+ * connect will be completed */
+ LM_DBG("Succesfully started async connection \n");
+ tcp_conn_release(c, 0);
+ return len;
+ }
+ /* our first connect attempt succeeded - go ahead as normal */
+ } else if ((c=bin_sync_connect(send_sock, to, &fd))==0) {
+ LM_ERR("connect failed\n");
+ return -1;
+ }
+
+ goto send_it;
+ }
+
+ /* now we have a connection, let's see what we can do with it */
+ /* BE CAREFUL now as we need to release the conn before exiting !!! */
+ if (fd==-1) {
+ /* connection is not writable because of its state - can we append
+ * data to it for later writting (async writting)? */
+ if (c->state==S_CONN_CONNECTING) {
+ /* the connection is currently in the process of getting
+ * connected - let's append our send chunk as well - just in
+ * case we ever manage to get through */
+ LM_DBG("We have acquired a TCP connection which is still "
+ "pending to connect - delaying write \n");
+ n = add_write_chunk(c,buf,len,1);
+ if (n < 0) {
+ LM_ERR("Failed to add another write chunk to %p\n",c);
+ /* we failed due to internal errors - put the
+ * connection back */
+ tcp_conn_release(c, 0);
+ return -1;
+ }
+
+ /* we succesfully added our write chunk - success */
+ tcp_conn_release(c, 0);
+ return len;
+ } else {
+ /* return error, nothing to do about it */
+ tcp_conn_release(c, 0);
+ return -1;
+ }
+ }
+
+
+send_it:
+ LM_DBG("sending via fd %d...\n",fd);
+
+ n = _bin_write_on_socket(c, fd, buf, len);
+
+ tcp_conn_set_lifetime( c, tcp_con_lifetime);
+
+ LM_DBG("after write: c= %p n=%d fd=%d\n",c, n, fd);
+ /* LM_DBG("buf=\n%.*s\n", (int)len, buf); */
+ if (n<0){
+ LM_ERR("failed to send\n");
+ c->state=S_CONN_BAD;
+ if (c->proc_id != process_no)
+ close(fd);
+ tcp_conn_release(c, 0);
+ return -1;
+ }
+
+ /* only close the FD if not already in the context of our process
+ either we just connected, or main sent us the FD */
+ if (c->proc_id != process_no)
+ close(fd);
+
+ tcp_conn_release(c, (nfd;
+ bytes_free=TCP_BUF_SIZE- (int)(r->pos - r->buf);
+
+ if (bytes_free==0){
+ LM_ERR("buffer overrun, dropping\n");
+ r->error=TCP_REQ_OVERRUN;
+ return -1;
+ }
+again:
+ bytes_read=read(fd, r->pos, bytes_free);
+
+ if(bytes_read==-1){
+ if (errno == EWOULDBLOCK || errno == EAGAIN){
+ return 0; /* nothing has been read */
+ }else if (errno == EINTR) goto again;
+ else{
+ LM_ERR("error reading: %s\n",strerror(errno));
+ r->error=TCP_READ_ERROR;
+ return -1;
+ }
+ }else if (bytes_read==0){
+ c->state=S_CONN_EOF;
+ LM_DBG("EOF on %p, FD %d\n", c, fd);
+ }
+#ifdef EXTRA_DEBUG
+ LM_DBG("read %d bytes:\n%.*s\n", bytes_read, bytes_read, r->pos);
+#endif
+ r->pos+=bytes_read;
+ return bytes_read;
+}
+
+static int bin_handle_req(struct tcp_req *req,
+ struct tcp_connection *con, int _max_msg_chunks)
+{
+ long size;
+
+ if (req->complete){
+ /* update the timeout - we succesfully read the request */
+ tcp_conn_set_lifetime( con, tcp_con_lifetime);
+ con->timeout = con->lifetime;
+
+ LM_DBG("completely received a message\n");
+ /* rcv.bind_address should always be !=0 */
+ /* just for debugging use sendipv4 as receiving socket FIXME*/
+ con->rcv.proto_reserved1=con->id; /* copy the id */
+
+ /* prepare for next request */
+ size=req->pos - req->parsed;
+
+ if (!size) {
+ /* did not read any more things - we can release
+ * the connection */
+ LM_DBG("Nothing more to read on TCP conn %p, currently in state %d \n",
+ con,con->state);
+ if (req != &bin_current_req) {
+ /* we have the buffer in the connection tied buff -
+ * detach it , release the conn and free it afterwards */
+ con->con_req = NULL;
+ }
+ } else {
+ LM_DBG("We still have things on the pipe - "
+ "keeping connection \n");
+ }
+
+ /* give the message to the registered functions */
+ call_callbacks(req->buf, &con->rcv);
+
+
+ if (!size && req != &bin_current_req) {
+ /* if we no longer need this tcp_req
+ * we can free it now */
+ pkg_free(req);
+ }
+
+ if (size)
+ memmove(req->buf, req->parsed, size);
+
+ init_tcp_req(req, size);
+ con->msg_attempts = 0;
+
+ /* if we still have some unparsed bytes, try to parse them too*/
+ if (size)
+ return 1;
+ } else {
+ /* request not complete - check the if the thresholds are exceeded */
+ if (con->msg_attempts==0)
+ /* if first iteration, set a short timeout for reading
+ * a whole SIP message */
+ con->timeout = get_ticks() + tcp_max_msg_time;
+
+ con->msg_attempts ++;
+ if (con->msg_attempts == _max_msg_chunks) {
+ LM_ERR("Made %u read attempts but message is not complete yet - "
+ "closing connection \n",con->msg_attempts);
+ goto error;
+ }
+
+ if (req == &bin_current_req) {
+ /* let's duplicate this - most likely another conn will come in */
+
+ LM_DBG("We didn't manage to read a full request\n");
+ con->con_req = pkg_malloc(sizeof(struct tcp_req));
+ if (con->con_req == NULL) {
+ LM_ERR("No more mem for dynamic con request buffer\n");
+ goto error;
+ }
+
+ if (req->pos != req->buf) {
+ /* we have read some bytes */
+ memcpy(con->con_req->buf,req->buf,req->pos-req->buf);
+ con->con_req->pos = con->con_req->buf + (req->pos-req->buf);
+ } else {
+ con->con_req->pos = con->con_req->buf;
+ }
+
+ if (req->parsed != req->buf)
+ con->con_req->parsed =con->con_req->buf+(req->parsed-req->buf);
+ else
+ con->con_req->parsed = con->con_req->buf;
+
+ con->con_req->complete=req->complete;
+ con->con_req->content_len=req->content_len;
+ con->con_req->error = req->error;
+ }
+ }
+
+ /* everything ok */
+ return 0;
+error:
+ /* report error */
+ return -1;
+}
+
+static void bin_parse_headers(struct tcp_req *req){
+ unsigned short *px;
+ if(req->content_len == 0 && req->pos - req->buf < HEADER_SIZE){
+ req->parsed = req->pos;
+ return;
+ }
+ px = (unsigned short*)(req->buf + MARKER_SIZE);
+ req->content_len = (*px);
+ if(req->pos - req->buf == req->content_len){
+ LM_DBG("received a COMPLETE message\n");
+ req->complete = 1;
+ req->parsed = req->buf + req->content_len;
+ } else if(req->pos - req->buf > req->content_len){
+ LM_DBG("received MORE then a message\n");
+ req->complete = 1;
+ req->parsed = req->buf + req->content_len;
+ } else {
+ LM_DBG("received only PART of a message\n");
+ req->parsed = req->pos;
+ }
+}
+
+static int bin_read_req(struct tcp_connection* con, int* bytes_read){
+ LM_INFO("reading from connection\n");
+
+ int bytes;
+ int total_bytes;
+ struct tcp_req *req;
+
+ bytes = -1;
+ total_bytes = 0;
+
+ if (con->con_req) {
+ req = con->con_req;
+ LM_DBG("Using the per connection buff \n");
+ } else {
+ LM_DBG("Using the global ( per process ) buff \n");
+ init_tcp_req(&bin_current_req, 0);
+ req = &bin_current_req;
+ }
+
+ again:
+ if(req->error == TCP_REQ_OK){
+ /* if we still have some unparsed part, parse it first,
+ * don't do the read*/
+ if (req->parsed < req->pos){
+ bytes=0;
+ } else {
+ bytes=tcp_read(con,req);
+ if (bytes < 0) {
+ LM_ERR("failed to read \n");
+ goto error;
+ }
+ }
+
+ bin_parse_headers(req);
+
+ total_bytes+=bytes;
+ /* eof check:
+ * is EOF if eof on fd and req. not complete yet,
+ * if req. is complete we might have a second unparsed
+ * request after it, so postpone release_with_eof
+ */
+ if ((con->state==S_CONN_EOF) && (req->complete==0)) {
+ LM_DBG("EOF received\n");
+ goto done;
+ }
+ }
+
+ if (req->error!=TCP_REQ_OK){
+ LM_ERR("bad request, state=%d, error=%d "
+ "buf:\n%.*s\nparsed:\n%.*s\n", req->state, req->error,
+ (int)(req->pos-req->buf), req->buf,
+ (int)(req->parsed-req->start), req->start);
+ LM_DBG("- received from: port %d\n", con->rcv.src_port);
+ print_ip("- received from: ip ",&con->rcv.src_ip, "\n");
+ goto error;
+ }
+
+ switch (bin_handle_req(req, con, bin_max_msg_chunks) ) {
+ case 1:
+ goto again;
+ case -1:
+ goto error;
+ }
+
+ LM_DBG("tcp_read_req end\n");
+done:
+ if (bytes_read) *bytes_read=total_bytes;
+ /* connection will be released */
+ return 0;
+error:
+ /* connection will be released as ERROR */
+ return -1;
+}
+
+static int bin_write_async_req(struct tcp_connection* con,int fd)
+{
+ int n,left;
+ struct bin_send_chunk *chunk;
+ struct bin_data *d = (struct bin_data*)con->proto_data;
+
+ if (d->async_chunks_no == 0) {
+ LM_DBG("The connection has been triggered "
+ " for a write event - but we have no pending write chunks\n");
+ return 0;
+ }
+
+next_chunk:
+ chunk=d->async_chunks[0];
+again:
+ left = (int)((chunk->buf+chunk->len)-chunk->pos);
+ LM_DBG("Trying to send %d bytes from chunk %p in conn %p - %d %d \n",
+ left,chunk,con,chunk->ticks,get_ticks());
+ n = send(fd, chunk->pos, left, 0);
+ if (n<0) {
+ if (errno == EINTR)
+ goto again;
+ else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ LM_DBG("Can't finish to write chunk %p on conn %p\n",
+ chunk,con);
+ /* report back we have more writting to be done */
+ return 1;
+ } else {
+ LM_ERR("Error occured while sending async chunk %d (%s)\n",
+ errno,strerror(errno));
+ /* report the conn as broken */
+ return -1;
+ }
+ }
+
+ if (n < left) {
+ /* partial write */
+ chunk->pos+=n;
+ goto again;
+ } else {
+ /* written a full chunk - move to the next one, if any */
+ shm_free(chunk);
+ d->async_chunks_no--;
+ if (d->async_chunks_no == 0) {
+ LM_DBG("We have finished writing all our async chunks in %p\n",con);
+ d->oldest_chunk=0;
+ /* report back everything ok */
+ return 0;
+ } else {
+ LM_DBG("We still have %d chunks pending on %p\n",
+ d->async_chunks_no,con);
+ memmove(&d->async_chunks[0],&d->async_chunks[1],
+ d->async_chunks_no * sizeof(struct bin_send_chunk*));
+ d->oldest_chunk = d->async_chunks[0]->ticks;
+ goto next_chunk;
+ }
+ }
+}
+
diff --git a/modules/proto_bin/proto_bin.h b/modules/proto_bin/proto_bin.h
new file mode 100644
index 00000000000..55ecc89f533
--- /dev/null
+++ b/modules/proto_bin/proto_bin.h
@@ -0,0 +1,6 @@
+#ifndef _proto_bin__
+#define _proto_bin__
+
+#define MARKER_SIZE 4
+
+#endif
\ No newline at end of file
diff --git a/modules/usrloc/ureplication.c b/modules/usrloc/ureplication.c
index 63a1752d586..081a3b80528 100644
--- a/modules/usrloc/ureplication.c
+++ b/modules/usrloc/ureplication.c
@@ -26,6 +26,7 @@
#include "ureplication.h"
#include "dlist.h"
+#include "../../forward.h"
str repl_module_name = str_init("ul");
@@ -37,6 +38,7 @@ int skip_replicated_db_ops;
void replicate_urecord_insert(urecord_t *r)
{
struct replication_dest *d;
+ str send_buffer;
if (bin_init(&repl_module_name, REPL_URECORD_INSERT) != 0) {
LM_ERR("failed to replicate this event\n");
@@ -46,13 +48,16 @@ void replicate_urecord_insert(urecord_t *r)
bin_push_str(r->domain);
bin_push_str(&r->aor);
+ bin_get_buffer(&send_buffer);
+
for (d = replication_dests; d; d = d->next)
- bin_send(&d->to);
+ msg_send(0,PROTO_BIN,&d->to,0,send_buffer.s,send_buffer.len,0);
}
void replicate_urecord_delete(urecord_t *r)
{
struct replication_dest *d;
+ str send_buffer;
if (bin_init(&repl_module_name, REPL_URECORD_DELETE) != 0) {
LM_ERR("failed to replicate this event\n");
@@ -62,13 +67,16 @@ void replicate_urecord_delete(urecord_t *r)
bin_push_str(r->domain);
bin_push_str(&r->aor);
+ bin_get_buffer(&send_buffer);
+
for (d = replication_dests; d; d = d->next)
- bin_send(&d->to);
+ msg_send(0,PROTO_BIN,&d->to,0,send_buffer.s,send_buffer.len,0);
}
void replicate_ucontact_insert(urecord_t *r, str *contact, ucontact_info_t *ci)
{
struct replication_dest *d;
+ str send_buffer;
str st;
if (bin_init(&repl_module_name, REPL_UCONTACT_INSERT) != 0) {
@@ -104,13 +112,16 @@ void replicate_ucontact_insert(urecord_t *r, str *contact, ucontact_info_t *ci)
st.len = sizeof ci->last_modified;
bin_push_str(&st);
+ bin_get_buffer(&send_buffer);
+
for (d = replication_dests; d; d = d->next)
- bin_send(&d->to);
+ msg_send(0,PROTO_BIN,&d->to,0,send_buffer.s,send_buffer.len,0);
}
void replicate_ucontact_update(urecord_t *r, str *contact, ucontact_info_t *ci)
{
struct replication_dest *d;
+ str send_buffer;
str st;
if (bin_init(&repl_module_name, REPL_UCONTACT_UPDATE) != 0) {
@@ -146,13 +157,16 @@ void replicate_ucontact_update(urecord_t *r, str *contact, ucontact_info_t *ci)
st.len = sizeof ci->last_modified;
bin_push_str(&st);
+ bin_get_buffer(&send_buffer);
+
for (d = replication_dests; d; d = d->next)
- bin_send(&d->to);
+ msg_send(0,PROTO_BIN,&d->to,0,send_buffer.s,send_buffer.len,0);
}
void replicate_ucontact_delete(urecord_t *r, ucontact_t *c)
{
struct replication_dest *d;
+ str send_buffer;
if (bin_init(&repl_module_name, REPL_UCONTACT_DELETE) != 0) {
LM_ERR("failed to replicate this event\n");
@@ -165,8 +179,10 @@ void replicate_ucontact_delete(urecord_t *r, ucontact_t *c)
bin_push_str(&c->callid);
bin_push_int(c->cseq);
+ bin_get_buffer(&send_buffer);
+
for (d = replication_dests; d; d = d->next)
- bin_send(&d->to);
+ msg_send(0,PROTO_BIN,&d->to,0,send_buffer.s,send_buffer.len,0);
}
/* packet receiving */
diff --git a/net/net_tcp_proc.c b/net/net_tcp_proc.c
index 9335fb030ac..6e9d655a0aa 100644
--- a/net/net_tcp_proc.c
+++ b/net/net_tcp_proc.c
@@ -34,6 +34,7 @@
static struct tcp_connection* tcp_conn_lst=0;
static int tcpmain_sock=-1;
+extern int unix_tcp_sock;
static void tcpconn_release(struct tcp_connection* c, long state,int writer)
@@ -58,7 +59,9 @@ static void tcpconn_release(struct tcp_connection* c, long state,int writer)
/* errno==EINTR, EWOULDBLOCK a.s.o todo */
response[0]=(long)c;
response[1]=state;
- if (send_all(tcpmain_sock, response, sizeof(response))<=0)
+
+ if (send_all((state==ASYNC_WRITE)?unix_tcp_sock:tcpmain_sock, response,
+ sizeof(response))<=0)
LM_ERR("send_all failed\n");
}
@@ -76,8 +79,9 @@ void tcp_conn_release(struct tcp_connection* c, int pending_data)
tcpconn_release(c, CONN_ERROR,1);
return;
}
- if (pending_data)
+ if (pending_data) {
tcpconn_release(c, ASYNC_WRITE,1);
+ }
tcpconn_put(c);
return;
}
diff --git a/net/proto_tcp/proto_tcp.c b/net/proto_tcp/proto_tcp.c
index 6d140702d9f..6914f22e599 100644
--- a/net/proto_tcp/proto_tcp.c
+++ b/net/proto_tcp/proto_tcp.c
@@ -600,7 +600,7 @@ static int async_tsend_stream(struct tcp_connection *c,
/* we have succesfully added async write chunk
* tell MAIN to poll out for us */
LM_DBG("Data still pending for write on conn %p\n",c);
- return len;
+ return 0;
}
}
diff --git a/net/trans.c b/net/trans.c
index f32bff310dc..b2c8e972a6e 100644
--- a/net/trans.c
+++ b/net/trans.c
@@ -248,7 +248,7 @@ int fix_all_socket_lists(void)
*p = '\0';
LM_ERR("listeners found for protocol %s, but no module "
- "can handle it\n", buf);
+ "can handle it, %d, %d \n", buf,i,protos[i].id);
goto error;
}
}
diff --git a/scripts/opensipsctl b/scripts/opensipsctl
index 02fb1fcb12a..8c9a0d93fbc 100755
--- a/scripts/opensipsctl
+++ b/scripts/opensipsctl
@@ -10,7 +10,7 @@ PATH=$PATH:/usr/local/sbin/
# for testing only, please don't enable this in production environments
# as this introduce security risks
-TEST="false"
+TEST="true"
### include config files
if [ -f /etc/opensips/opensipsctlrc ]; then
. /etc/opensips/opensipsctlrc
diff --git a/scripts/opensipsctlrc b/scripts/opensipsctlrc
index 2a7b5d988eb..2931d3491aa 100644
--- a/scripts/opensipsctlrc
+++ b/scripts/opensipsctlrc
@@ -109,7 +109,7 @@
# CTLENGINE=xmlrpc
## path to FIFO file
-# OSIPS_FIFO="/tmp/opensips_fifo"
+OSIPS_FIFO="/tmp/opensips_fifo_bkp"
## MI_CONNECTOR control engine: FIFO, UNIXSOCK, UDP, XMLRPC
# MI_CONNECTOR=FIFO:/tmp/opensips_fifo
diff --git a/socket_info.c b/socket_info.c
index 15800b26a31..f26a681fbea 100644
--- a/socket_info.c
+++ b/socket_info.c
@@ -334,6 +334,7 @@ int add_listen_iface(char* name, unsigned short port, unsigned short proto,
unsigned short c_proto;
c_proto=(proto)?proto:PROTO_UDP;
+ LM_INFO("XXX - c_proto = %d\n",c_proto);
do{
list=get_sock_info_list(c_proto);
if (list==0){
diff --git a/socket_info.h b/socket_info.h
index b9cc285fa49..fa87c6f8a10 100644
--- a/socket_info.h
+++ b/socket_info.h
@@ -175,6 +175,9 @@ inline static int parse_proto(unsigned char* s, long len, int* proto)
*proto=PROTO_SCTP; return 0;
}
break;
+ case PROTO2UINT('b', 'i', 'n'):
+ if(len==3) { *proto=PROTO_BIN; return 0; }
+ break;
default:
if(len==2 && (s[0]|0x20)=='w' && (s[1]|0x20)=='s') {
@@ -308,6 +311,11 @@ static inline char* proto2str(int proto, char *p)
*(p++) = 'w';
*(p++) = 's';
break;
+ case PROTO_BIN:
+ *(p++) = 'b';
+ *(p++) = 'i';
+ *(p++) = 'n';
+ break;
default:
LM_CRIT("unsupported proto %d\n", proto);
return 0;