Skip to content

Commit

Permalink
[fix functional bug][hep] multiple versions/protocols can be used in …
Browse files Browse the repository at this point in the history
…the same time

Since commit 0873c14 multiple
network protocols can be used from one module. Proto_hep was changed
to be able to use both tcp and udp in the same time. Moreover, there
is no hep_version needed since we can receive HEPv1,v2 and v3 on any
UDP interface. Since HEPv1 and v2 are not compatible with TCP, only
HEPv3 will be support. For HEPv3, Sipcapture module can now relay
packets from TCP to UDP or the reverse way.
Siptrace module was also improved. Trace id's can now be defined with
hep version and transport protocol, in order to be able to do multiple
types of tracing with different HEP protocols and/or network transports.

(cherry picked from commit 45a3aef)

Conflicts:
	modules/siptrace/siptrace.c
  • Loading branch information
ionutrazvanionita committed May 3, 2016
1 parent 5e8f0ac commit b2ff3cc
Show file tree
Hide file tree
Showing 14 changed files with 278 additions and 140 deletions.
2 changes: 1 addition & 1 deletion ip_addr.h
Expand Up @@ -51,7 +51,7 @@

enum sip_protos { PROTO_NONE = 0, PROTO_FIRST = 1, PROTO_UDP = 1, \
PROTO_TCP, PROTO_TLS, PROTO_SCTP, PROTO_WS, PROTO_WSS, PROTO_BIN,
PROTO_HEP, PROTO_OTHER };
PROTO_HEP_UDP, PROTO_HEP_TCP, PROTO_OTHER };
#define PROTO_LAST PROTO_OTHER

struct ip_addr{
Expand Down
2 changes: 1 addition & 1 deletion modules/nathelper/sip_pinger.h
Expand Up @@ -290,7 +290,7 @@ build_sipping(udomain_t *d, str *curi, struct socket_info* s,str *path,
{
#define s_len(_s) (sizeof(_s)-1)
static char buf[MAX_SIPPING_SIZE];
char *p, proto_str[4];
char *p, proto_str[PROTO_NAME_MAX_SIZE];
str address, port;
str st;
int len;
Expand Down
22 changes: 0 additions & 22 deletions modules/proto_hep/doc/proto_hep_admin.xml
Expand Up @@ -168,28 +168,6 @@ modparam("proto_bin", "hep_async", 0)
...
modparam("proto_hep", "hep_async_max_postponed_chunks", 16)
...
</programlisting>
</example>
</section>
<section>
<title><varname>hep_version</varname> (integer)</title>
<para>
The parameter indicate the version of HEP protocol.
Can be 1, 2 or 3. In HEPv2 the timestamp and capture agent ID will
be included to HEP header. Version 1 and 2 uses UDP and version 3
uses TCP.
</para>
<para>
<emphasis>
Default value is "3".
</emphasis>
</para>
<example>
<title>Set <varname>hep_version</varname> parameter</title>
<programlisting format="linespecific">
...
modparam("proto_hep", "hep_version", 2)
...
</programlisting>
</example>
</section>
Expand Down
14 changes: 8 additions & 6 deletions modules/proto_hep/hep.c
Expand Up @@ -44,7 +44,6 @@
#define GENERIC_VENDOR_ID 0x0000
#define HEP_PROTO_SIP 0x01

extern int hep_version;
extern int hep_capture_id;
extern int payload_compression;

Expand All @@ -53,11 +52,12 @@ extern compression_api_t compression_api;
static int pack_hepv3(union sockaddr_union* from_su, union sockaddr_union* to_su,
int proto, char *payload, int plen, char **retbuf, int *retlen);
static int pack_hepv2(union sockaddr_union* from_su, union sockaddr_union* to_su,
int proto, char *payload, int plen, char **retbuf, int *retlen);
int proto, char *payload, int plen, int hep_version,
char **retbuf, int *retlen);

/*
*
* pack as hep; version depends on hep_version
* pack as hep; version depends
* @in1 source sockkadr
* @in2 dest sockkadr
* @in3 protocolo
Expand All @@ -68,14 +68,15 @@ static int pack_hepv2(union sockaddr_union* from_su, union sockaddr_union* to_su
* it's your job to free the buffers
*/
int pack_hep(union sockaddr_union* from_su, union sockaddr_union* to_su,
int proto, char *payload, int plen, char **retbuf, int *retlen)
int proto, char *payload, int plen, int hep_version, char **retbuf,
int *retlen)
{

switch (hep_version) {
case 1:
case 2:
if (pack_hepv2(from_su, to_su, proto, payload,
plen, retbuf, retlen) < 0) {
plen, hep_version, retbuf, retlen) < 0) {
LM_ERR("failed to pack using hep protocol version 3\n");
return -1;
}
Expand Down Expand Up @@ -317,7 +318,8 @@ static int pack_hepv3(union sockaddr_union* from_su, union sockaddr_union* to_su
*/

static int pack_hepv2(union sockaddr_union* from_su, union sockaddr_union* to_su,
int proto, char *payload, int plen, char **retbuf, int *retlen)
int proto, char *payload, int plen, int hep_version,
char **retbuf, int *retlen)
{
char* buffer;
unsigned int totlen=0, buflen=0;
Expand Down
6 changes: 4 additions & 2 deletions modules/proto_hep/hep.h
Expand Up @@ -231,14 +231,16 @@ struct hep_context {
};

int pack_hep(union sockaddr_union* from_su, union sockaddr_union* to_su,
int proto, char *payload, int plen, char **retbuf, int *retlen);
int proto, char *payload, int plen, int hep_version,
char **retbuf, int *retlen);
int unpack_hepv2(char *buf, int len, struct hep_desc* h);
int unpack_hepv3(char *buf, int len, struct hep_desc *h);
int unpack_hep(char *buf, int len, int version, struct hep_desc* h);


typedef int (*pack_hep_t)(union sockaddr_union* from_su, union sockaddr_union* to_su,
int proto, char *payload, int plen, char **retbuf, int *retlen);
int proto, char *payload, int plen, int hep_version,
char **retbuf, int *retlen);
typedef int (*get_hep_ctx_id_t)(void);
#endif

3 changes: 0 additions & 3 deletions modules/proto_hep/hep_cb.c
Expand Up @@ -42,7 +42,6 @@
#include "hep.h"
#include "hep_cb.h"

extern int hep_version;
extern int hep_ctx_idx;

struct hep_cb_list {
Expand Down Expand Up @@ -123,8 +122,6 @@ int bind_proto_hep(proto_hep_api_t *api)
return -1;
}

api->version = hep_version;

api->pack_hep = pack_hep;
api->register_hep_cb = register_hep_cb;
api->get_hep_ctx_id = get_hep_ctx_id;
Expand Down
2 changes: 0 additions & 2 deletions modules/proto_hep/hep_cb.h
Expand Up @@ -46,8 +46,6 @@ typedef int (*register_hep_cb_t)(hep_cb_t cb);
*/

typedef struct proto_hep_api {
int version;

register_hep_cb_t register_hep_cb;
pack_hep_t pack_hep;
get_hep_ctx_id_t get_hep_ctx_id;
Expand Down
129 changes: 65 additions & 64 deletions modules/proto_hep/proto_hep.c
Expand Up @@ -53,8 +53,9 @@

static int mod_init(void);
static void destroy(void); /*!< Module destroy function */
static int proto_hep_init(struct proto_info *pi);
static int proto_hep_init_listener(struct socket_info *si);
static int proto_hep_init_udp(struct proto_info *pi);
static int proto_hep_init_tcp(struct proto_info *pi);
static int proto_hep_init_udp_listener(struct socket_info *si);
static int hep_conn_init(struct tcp_connection* c);
static void hep_conn_clean(struct tcp_connection* c);
static int hep_write_async_req(struct tcp_connection* con,int fd);
Expand All @@ -78,7 +79,6 @@ static int hep_async_local_write_timeout = 10;

int hep_ctx_idx=0;

int hep_version = 3;
int hep_capture_id = 1;
int payload_compression=0;

Expand Down Expand Up @@ -110,7 +110,8 @@ struct hep_data {


static cmd_export_t cmds[] = {
{"proto_init", (cmd_function)proto_hep_init, 0, 0, 0, 0},
{"proto_init", (cmd_function)proto_hep_init_udp, 0, 0, 0, 0},
{"proto_init", (cmd_function)proto_hep_init_tcp, 0, 0, 0, 0},
{"load_hep", (cmd_function)bind_proto_hep, 1, 0, 0, 0},
{0,0,0,0,0,0}
};
Expand All @@ -123,7 +124,6 @@ static param_export_t params[] = {
{ "hep_async_max_postponed_chunks", INT_PARAM,
&hep_async_max_postponed_chunks },
/* what protocol shall be used: 1, 2 or 3 */
{ "hep_version", INT_PARAM, &hep_version },
{ "hep_capture_id", INT_PARAM, &hep_capture_id },
{ "hep_async_local_connect_timeout", INT_PARAM,
&hep_async_local_connect_timeout},
Expand All @@ -138,7 +138,7 @@ static module_dependency_t *get_deps_compression(param_export_t *param)
{
int do_compression= *(int *)param->param_pointer;

if (hep_version < 3 || do_compression == 0)
if (do_compression == 0)
return NULL;

return alloc_module_dep(MOD_TYPE_DEFAULT, "compression", DEP_ABORT);
Expand Down Expand Up @@ -178,13 +178,6 @@ struct module_exports exports = {

static int mod_init(void)
{
LM_INFO("initializing HEP protocol\n");
if (hep_version < HEP_FIRST || hep_version > HEP_LAST) {
LM_WARN("invalid protocol version!"
"Assuming last protocol version [%d]!\n", HEP_LAST);
hep_version = HEP_LAST;
}

if (payload_compression) {
load_compression =
(load_compression_f)find_export("load_compression", 1, 0);
Expand Down Expand Up @@ -242,46 +235,51 @@ void free_hep_context(void *ptr)
}


static int proto_hep_init(struct proto_info *pi)
static int proto_hep_init_udp(struct proto_info *pi)
{

pi->id = PROTO_HEP;
pi->name = "hep";
pi->id = PROTO_HEP_UDP;
pi->name = "hep_udp";
pi->default_port = hep_port;
pi->tran.init_listener = proto_hep_init_listener;
pi->tran.init_listener = proto_hep_init_udp_listener;

switch (hep_version) {
case 1:
case 2:
pi->tran.send = hep_udp_send;
pi->tran.send = hep_udp_send;

pi->net.flags = PROTO_NET_USE_UDP;
pi->net.read = (proto_net_read_f)hep_udp_read_req;
pi->net.flags = PROTO_NET_USE_UDP;
pi->net.read = (proto_net_read_f)hep_udp_read_req;

break;
case 3:

pi->tran.dst_attr = tcp_conn_fcntl;
return 0;
}

static int proto_hep_init_tcp(struct proto_info *pi)
{

pi->id = PROTO_HEP_TCP;
pi->name = "hep_tcp";
pi->default_port = hep_port;
pi->tran.init_listener = tcp_init_listener;

pi->net.flags = PROTO_NET_USE_TCP;
pi->tran.dst_attr = tcp_conn_fcntl;

pi->net.read = (proto_net_read_f)hep_tcp_read_req;
pi->net.write = (proto_net_write_f)hep_write_async_req;
pi->net.flags = PROTO_NET_USE_TCP;

pi->tran.send = hep_tcp_send;
pi->net.read = (proto_net_read_f)hep_tcp_read_req;
pi->net.write = (proto_net_write_f)hep_write_async_req;

pi->tran.send = hep_tcp_send;

if (hep_async) {
pi->net.conn_init = hep_conn_init;
pi->net.conn_clean = hep_conn_clean;
}

break;
if (hep_async) {
pi->net.conn_init = hep_conn_init;
pi->net.conn_clean = hep_conn_clean;
}

return 0;
}



static int hep_conn_init(struct tcp_connection* c)
{
struct hep_data *d;
Expand Down Expand Up @@ -317,29 +315,9 @@ static void hep_conn_clean(struct tcp_connection* c)
}


static int proto_hep_init_listener(struct socket_info *si)
static int proto_hep_init_udp_listener(struct socket_info *si)
{
/* we do not do anything particular, so
* transparently use the generic listener init from net TCP layer
* or net UDP depending on hep version */
int ret = -1;

switch (hep_version) {
case 1:
case 2:
ret = udp_init_listener(si, hep_async?O_NONBLOCK:0);

break;
case 3:
ret = tcp_init_listener(si);

break;

default:
LM_ERR("hep version [%d] not implemented\n", hep_version);
}

return ret;
return udp_init_listener(si, hep_async?O_NONBLOCK:0);
}

static int add_write_chunk(struct tcp_connection *con,char *buf,int len,
Expand Down Expand Up @@ -1239,11 +1217,26 @@ static int hep_udp_read_req(struct socket_info *si, int* bytes_read)
memset(hep_ctx, 0, sizeof(struct hep_context));
memcpy(&hep_ctx->ri, &ri, sizeof(struct receive_info));

if (unpack_hepv2(buf, len, &hep_ctx->h)) {
LM_ERR("hep unpacking failed\n");

if (len < 4) {
LM_ERR("invalid message! too short!\n");
return -1;
}

if (!memcmp(buf, HEP_HEADER_ID, HEP_HEADER_ID_LEN)) {
/* HEPv3 */
if (unpack_hepv3(buf, len, &hep_ctx->h)) {
LM_ERR("hepv3 unpacking failed\n");
return -1;
}
} else {
/* HEPv2 */
if (unpack_hepv2(buf, len, &hep_ctx->h)) {
LM_ERR("hepv12 unpacking failed\n");
return -1;
}
}

/* set context for receive_msg */
if ((ctx=context_alloc(CONTEXT_GLOBAL)) == NULL) {
LM_ERR("failed to allocate new context! skipping...\n");
Expand All @@ -1267,12 +1260,20 @@ static int hep_udp_read_req(struct socket_info *si, int* bytes_read)
}
current_processing_ctx = NULL;

msg.len = len - hep_ctx->h.u.hepv12.hdr.hp_l;
msg.s = buf + hep_ctx->h.u.hepv12.hdr.hp_l;
if (hep_ctx->h.version == 3) {
/* HEPv3 */
msg.len =
hep_ctx->h.u.hepv3.payload_chunk.chunk.length- sizeof(hep_chunk_t);
msg.s = hep_ctx->h.u.hepv3.payload_chunk.data;
} else {
/* HEPv12 */
msg.len = len - hep_ctx->h.u.hepv12.hdr.hp_l;
msg.s = buf + hep_ctx->h.u.hepv12.hdr.hp_l;

if (hep_ctx->h.u.hepv12.hdr.hp_v == 2) {
msg.s += sizeof(struct hep_timehdr);
msg.len -= sizeof(struct hep_timehdr);
if (hep_ctx->h.u.hepv12.hdr.hp_v == 2) {
msg.s += sizeof(struct hep_timehdr);
msg.len -= sizeof(struct hep_timehdr);
}
}

if (ri.src_port==0){
Expand Down

0 comments on commit b2ff3cc

Please sign in to comment.