Skip to content

Commit

Permalink
smpp: add better sessions management
Browse files Browse the repository at this point in the history
  • Loading branch information
razvancrainea committed Jan 23, 2019
1 parent dcd0b9b commit 87d5a8b
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 69 deletions.
64 changes: 32 additions & 32 deletions modules/proto_smpp/db.c
Expand Up @@ -21,6 +21,7 @@

#include "../../str.h"
#include "../../resolve.h"
#include "../../lib/list.h"
#include "proto_smpp.h"
#include "../../db/db.h"
#include "db.h"
Expand All @@ -43,25 +44,6 @@ str smpp_session_type_col = str_init("session_type");

int smpp_db_bind(const str *db_url)
{
if (db_bind_mod(db_url, &smpp_dbf)) {
LM_ERR("cannot bind module database\n");
return -1;
}
return 0;
}

int smpp_db_init(const str *db_url)
{
if (smpp_dbf.init == 0) {
LM_ERR("unbound database module\n");
return -1;
}
smpp_db_handle = smpp_dbf.init(db_url);
if (smpp_db_handle == 0){
LM_ERR("cannot initialize database connection\n");
return -1;
}

smpp_table.len = strlen(smpp_table.s);
smpp_name_col.len = strlen(smpp_name_col.s);
smpp_ip_col.len = strlen(smpp_ip_col.s);
Expand All @@ -75,27 +57,31 @@ int smpp_db_init(const str *db_url)
smpp_dst_npi_col.len = strlen(smpp_dst_npi_col.s);
smpp_session_type_col.len = strlen(smpp_session_type_col.s);

if (db_bind_mod(db_url, &smpp_dbf)) {
LM_ERR("cannot bind module database\n");
return -1;
}
return 0;
}

int smpp_query(const str *smpp_table, db_key_t *cols, int col_nr, db_res_t **res)
int smpp_db_init(const str *db_url)
{
if (smpp_dbf.use_table(smpp_db_handle, smpp_table) < 0) {
LM_ERR("error while trying to use smpp table\n");
if (smpp_dbf.init == 0) {
LM_ERR("unbound database module\n");
return -1;
}

if (smpp_dbf.query(smpp_db_handle, NULL, 0, NULL, cols, 0, col_nr, 0, res) < 0) {
LM_ERR("error while querying database\n");
smpp_db_handle = smpp_dbf.init(db_url);
if (smpp_db_handle == 0){
LM_ERR("cannot initialize database connection\n");
return -1;
}

return 0;
}

void smpp_free_results(db_res_t *res)
int smpp_query(const str *smpp_table, db_key_t *cols, int col_nr, db_res_t **res)
{
smpp_dbf.free_result(smpp_db_handle, res);
return 0;
}

void smpp_db_close(void)
Expand All @@ -106,7 +92,7 @@ void smpp_db_close(void)
}
}

void build_smpp_sessions_from_db(void)
int load_smpp_sessions_from_db(struct list_head *head)
{
struct ip_addr *ip;
db_key_t cols[11];
Expand All @@ -116,7 +102,7 @@ void build_smpp_sessions_from_db(void)
smpp_session_t *session;
str ip_s, system_s, pass_s, type_s, name_s;

int i;
int i, n = 0;

cols[0] = &smpp_name_col;
cols[1] = &smpp_ip_col;
Expand All @@ -130,8 +116,18 @@ void build_smpp_sessions_from_db(void)
cols[9] = &smpp_dst_npi_col;
cols[10] = &smpp_session_type_col;

if (smpp_query(&smpp_table, cols, 11, &res) < 0)
return;
INIT_LIST_HEAD(head);

if (smpp_dbf.use_table(smpp_db_handle, &smpp_table) < 0) {
LM_ERR("error while trying to use smpp table\n");
return -1;
}

if (smpp_dbf.query(smpp_db_handle, NULL, 0, NULL, cols, 0, 11, 0, &res) < 0) {
LM_ERR("error while querying database\n");
return -1;
}


row = RES_ROWS(res);

Expand Down Expand Up @@ -233,6 +229,10 @@ void build_smpp_sessions_from_db(void)
i, name_s.len, name_s.s);
continue;
}
list_add(&session->list, head);
n++;
}
smpp_free_results(res);
smpp_dbf.free_result(smpp_db_handle, res);
LM_INFO("Loaded %d SMSc servers\n", n);
return n;
}
1 change: 1 addition & 0 deletions modules/proto_smpp/db.h
Expand Up @@ -41,6 +41,7 @@ extern str smpp_outbound_uri;

int smpp_db_bind(const str *db_url);
int smpp_db_init(const str *db_url);
int load_smpp_sessions_from_db(struct list_head *head);
int smpp_query(const str *smpp_table, db_key_t *cols, int col_nr, db_res_t **res);
void smpp_free_results(db_res_t *res);
void smpp_db_close(void);
Expand Down
11 changes: 9 additions & 2 deletions modules/proto_smpp/proto_smpp.c
Expand Up @@ -46,6 +46,12 @@
#include "utils.h"
#include "db.h"

/*
* TODO:
* - implement reload
* - reconnect when connection is down
*/

extern int proto_tcp_read(struct tcp_connection* ,struct tcp_req* );

static int mod_init(void);
Expand Down Expand Up @@ -147,8 +153,6 @@ static int mod_init(void)
if (smpp_sessions_init() < 0)
return -1;

build_smpp_sessions_from_db();

smpp_db_close();

if (register_timer("enquire-link-timer", enquire_link, NULL, 5,
Expand Down Expand Up @@ -213,6 +217,9 @@ static int child_init(int rank)
{
LM_INFO("initializing child #%d\n", rank);

if (smpp_db_init(&db_url) < 0)
return -1;

if ((rank == 1) && ipc_dispatch_rpc(rpc_bind_sessions, NULL) < 0) {
LM_CRIT("failed to RPC the data loading\n");
return -1;
Expand Down
4 changes: 2 additions & 2 deletions modules/proto_smpp/proto_smpp.h
Expand Up @@ -22,6 +22,7 @@
#ifndef _PROTO_SMPP_H_
#define _PROTO_SMPP_H_

#include "../../lib/list.h"
#include "../../str.h"
#include "smpp.h"

Expand Down Expand Up @@ -53,7 +54,7 @@ typedef struct smpp_session {
uint8_t dest_addr_ton;
uint8_t dest_addr_npi;

struct smpp_session *next;
struct list_head list;
} smpp_session_t;

extern struct tm_binds tmb;
Expand All @@ -64,7 +65,6 @@ struct tcp_connection* smpp_sync_connect(struct socket_info* send_sock,
union sockaddr_union* server, int *fd);

void enquire_link(unsigned int ticks, void *param);
void build_smpp_sessions_from_db(void);
void rpc_bind_sessions(int sender_id, void *param);
void handle_smpp_msg(char *buffer, struct tcp_connection *conn);
void send_submit_or_deliver_request(str *msg, str *src, str *dst,
Expand Down
98 changes: 65 additions & 33 deletions modules/proto_smpp/smpp.c
Expand Up @@ -41,8 +41,9 @@
#include "../../parser/parse_from.h"

#include "proto_smpp.h"
#include "smpp.h"
#include "utils.h"
#include "smpp.h"
#include "db.h"

str smpp_outbound_uri;

Expand All @@ -55,7 +56,9 @@ static uint32_t increment_sequence_number(smpp_session_t *session);
/** TM bind */
struct tm_binds tmb;

static smpp_session_t **g_sessions = NULL;
//static smpp_session_t **g_sessions = NULL;
struct list_head *g_sessions;
rw_lock_t *smpp_lock; /* reader-writers lock for reloading the data */

static uint32_t get_payload_from_header(char *payload, smpp_header_t *header)
{
Expand Down Expand Up @@ -533,29 +536,42 @@ void send_bind(smpp_session_t *session)
pkg_free(req);
}

void rpc_bind_sessions(int sender_id, void *param)
void smpp_bind_sessions(struct list_head *list)
{
int n = 0;
smpp_session_t *session_it = *g_sessions;
while (session_it) {
if (session_it->session_type == SMPP_OUTBIND)
send_outbind(session_it);
struct list_head *l;
smpp_session_t *session;

list_for_each(l, list) {
session = list_entry(l, smpp_session_t, list);
if (session->session_type == SMPP_OUTBIND)
send_outbind(session);
else
send_bind(session_it);
session_it = session_it->next;
n++;
send_bind(session);
}
LM_INFO("sent %d\n", n);
}

void rpc_bind_sessions(int sender_id, void *param)
{
if (load_smpp_sessions_from_db(g_sessions) < 0) {
LM_INFO("cannot load smpp sessions!\n");
return;
}
smpp_bind_sessions(g_sessions);
}


void enquire_link(unsigned int ticks, void *params)
{
smpp_session_t *session_it = *g_sessions;
while (session_it) {
send_enquire_link_request(session_it);
session_it = session_it->next;
struct list_head *l;
smpp_session_t *session;

lock_start_read(smpp_lock);

list_for_each(l, g_sessions) {
session = list_entry(l, smpp_session_t, list);
send_enquire_link_request(session);
}
lock_stop_read(smpp_lock);
}


Expand Down Expand Up @@ -689,8 +705,13 @@ void send_submit_or_deliver_resp(smpp_submit_sm_req_t *req, struct tcp_connectio

uint32_t bind_session(smpp_bind_transceiver_t *body, struct tcp_connection *conn)
{
smpp_session_t *session_it = *g_sessions;
for (session_it = *g_sessions; session_it; session_it = session_it->next) {
struct list_head *l;
smpp_session_t *session_it;

lock_start_read(smpp_lock);

list_for_each(l, g_sessions) {
session_it = list_entry(l, smpp_session_t, list);
// TODO what if there is no \0 at the end, but they
// match
if (strncmp(session_it->bind.transceiver.system_id, body->system_id, MAX_SYSTEM_ID_LEN) != 0)
Expand All @@ -706,8 +727,10 @@ uint32_t bind_session(smpp_bind_transceiver_t *body, struct tcp_connection *conn
LM_INFO("successfully found \"%.*s\"\n", MAX_SYSTEM_ID_LEN, body->system_id);
session_it->conn = conn;
conn->proto_data = session_it;
lock_stop_read(smpp_lock);
return ESME_ROK;
}
lock_stop_read(smpp_lock);
LM_WARN("no system_id matched \"%.*s\"\n", MAX_SYSTEM_ID_LEN, body->system_id);
return ESME_RBINDFAIL;
}
Expand Down Expand Up @@ -994,8 +1017,12 @@ void handle_smpp_msg(char *buffer, struct tcp_connection *conn)
void send_submit_or_deliver_request(str *msg, str *src, str *dst,
smpp_session_t *session)
{
struct tcp_connection *conn;
int ret, fd, n;

/* TODO: fix this */
session = *g_sessions;
if (!session)
session = list_entry(g_sessions->next, smpp_session_t, list);

smpp_submit_sm_req_t *req;
LM_DBG("sending submit_sm\n");
Expand All @@ -1007,15 +1034,14 @@ void send_submit_or_deliver_request(str *msg, str *src, str *dst,
LM_ERR("error creating submit_sm request\n");
return;
}
struct tcp_connection *conn;
int fd;
int ret = tcp_conn_get(0, &(*g_sessions)->ip, (*g_sessions)->port,
ret = tcp_conn_get(0, &session->ip, session->port,
PROTO_SMPP, &conn, &fd);
if (ret < 0) {
LM_ERR("return code %d\n", ret);
goto free_req;
}
int n = tsend_stream(fd, req->payload.s, req->payload.len, 1000);
/* TODO: handle send timeout */
n = tsend_stream(fd, req->payload.s, req->payload.len, 1000);
LM_INFO("send %d bytes\n", n);

free_req:
Expand All @@ -1024,21 +1050,26 @@ void send_submit_or_deliver_request(str *msg, str *src, str *dst,

static void send_enquire_link_request(smpp_session_t *session)
{
struct tcp_connection *conn;
int fd, ret, n;

smpp_enquire_link_req_t *req;
if (build_enquire_link_request(&req, session)) {
LM_ERR("error creating enquire_link_sm request\n");
return;
}

struct tcp_connection *conn;
int fd;
int ret = tcp_conn_get(0, &(*g_sessions)->ip, (*g_sessions)->port,
/* TODO: fix this */
if (!session)
session = list_entry(g_sessions->next, smpp_session_t, list);

ret = tcp_conn_get(0, &session->ip, session->port,
PROTO_SMPP, &conn, &fd);
if (ret < 0) {
LM_ERR("return code %d\n", ret);
goto free_req;
}
int n = tsend_stream(fd, req->payload.s, req->payload.len, 1000);
n = tsend_stream(fd, req->payload.s, req->payload.len, 1000);
LM_INFO("send %d bytes\n", n);

free_req:
Expand Down Expand Up @@ -1090,11 +1121,16 @@ static int recv_smpp_msg(smpp_header_t *header, smpp_deliver_sm_t *body, struct

int smpp_sessions_init(void)
{
g_sessions = shm_malloc(sizeof(smpp_session_t*));
g_sessions = shm_malloc(sizeof(*g_sessions));
if (!g_sessions) {
LM_CRIT("failed to allocate shared memory for sessions pointer\n");
return -1;
}
smpp_lock = lock_init_rw();
if (!smpp_lock) {
LM_CRIT("cannot allocate shared memory fir smpp_lock\n");
return -1;
}
return 0;
}

Expand Down Expand Up @@ -1153,11 +1189,7 @@ smpp_session_t *smpp_session_new(str *name, struct ip_addr *ip, int port,
session->dest_addr_npi = dst_addr_npi;
session->session_type = stype;

LM_DBG("Added %.*s SMSC\n", name->len, name->s);
LM_DBG("Added %.*s SMSC %p\n", name->len, name->s, session);

/* TODO: now link it to global list, but in the future, add it tmp list */
if (*g_sessions)
session->next = *g_sessions;
*g_sessions = session;
return session;
}

0 comments on commit 87d5a8b

Please sign in to comment.