Skip to content

Commit

Permalink
Add reconnection logic to the SBUS
Browse files Browse the repository at this point in the history
Any client of the SBUS that wants to implement automatic
reconnection may now call sbus_reconnect_init to set it up.
The clients will need to set up a callback to handle the result
of the reconnection and (in the case of a successful reconnection)
readd the method handlers to the connection context.
  • Loading branch information
sgallagher authored and simo5 committed Mar 20, 2009
1 parent 76c4a8c commit 6a3ca34
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 1 deletion.
17 changes: 17 additions & 0 deletions server/sbus/sssd_dbus.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ typedef int (*sbus_msg_handler_fn)(DBusMessage *, struct sbus_conn_ctx *);
*/
typedef int (*sbus_conn_destructor_fn)(void *);

typedef void (*sbus_conn_reconn_callback_fn)(struct sbus_conn_ctx *, int, void *);

/*
* sbus_server_conn_init_fn
* Set up function for connection-specific activities
Expand All @@ -48,6 +50,12 @@ enum {
SBUS_CONN_TYPE_SHARED
};

enum {
SBUS_RECONNECT_SUCCESS = 1,
SBUS_RECONNECT_EXCEEDED_RETRIES,
SBUS_RECONNECT_ERROR
};

/* Special interface and method for D-BUS introspection */
#define DBUS_INTROSPECT_INTERFACE "org.freedesktop.DBus.Introspectable"
#define DBUS_INTROSPECT_METHOD "Introspect"
Expand Down Expand Up @@ -117,6 +125,15 @@ int sbus_conn_add_method_ctx(struct sbus_conn_ctx *conn_ctx,
struct sbus_method_ctx *method_ctx);
bool sbus_conn_disconnecting(struct sbus_conn_ctx *conn_ctx);

/* max_retries < 0: retry forever
* max_retries = 0: never retry (why are you calling this function?)
* max_retries > 0: obvious
*/
void sbus_reconnect_init(struct sbus_conn_ctx *conn_ctx,
int max_retries,
sbus_conn_reconn_callback_fn callback,
void *pvt);

/* Default message handler
* Should be usable for most cases */
DBusHandlerResult sbus_message_handler(DBusConnection *conn,
Expand Down
190 changes: 189 additions & 1 deletion server/sbus/sssd_dbus_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,18 @@ struct dbus_ctx_list;
struct sbus_conn_ctx {
DBusConnection *conn;
struct tevent_context *ev;
char *address;
int connection_type;
int disconnect;
struct sbus_method_ctx *method_ctx_list;
sbus_conn_destructor_fn destructor;
void *pvt_data; /* Private data for this connection */

int retries;
int max_retries;
sbus_conn_reconn_callback_fn reconnect_callback;
/* Private data needed to reinit after reconnection */
void *reconnect_pvt;
};

struct sbus_message_handler_ctx {
Expand All @@ -39,6 +46,8 @@ static int _method_list_contains_path(struct sbus_method_ctx *list,
struct sbus_method_ctx *method);
static void sbus_unreg_object_paths(struct sbus_conn_ctx *dct_ctx);

static int sbus_auto_reconnect(struct sbus_conn_ctx *conn_ctx);

static void sbus_dispatch(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval tv, void *data)
Expand All @@ -55,6 +64,32 @@ static void sbus_dispatch(struct tevent_context *ev,
conn = dct_ctx->conn;
DEBUG(6, ("conn: %lX\n", conn));

if (dct_ctx->retries > 0) {
DEBUG(6, ("SBUS is reconnecting. Deferring.\n"));
/* Currently trying to reconnect, defer dispatch */
new_event = tevent_add_timer(ev, dct_ctx, tv, sbus_dispatch, dct_ctx);
if (new_event == NULL) {
DEBUG(0,("Could not defer dispatch!\n"));
}
return;
}

if ((!dbus_connection_get_is_connected(conn)) &&
(dct_ctx->max_retries != 0)) {
/* Attempt to reconnect automatically */
ret = sbus_auto_reconnect(dct_ctx);
if (ret == EOK) {
DEBUG(1, ("Performing auto-reconnect\n"));
return;
}

DEBUG(0, ("Cannot start auto-reconnection.\n"));
dct_ctx->reconnect_callback(dct_ctx,
SBUS_RECONNECT_ERROR,
dct_ctx->reconnect_pvt);
return;
}

if((dct_ctx->disconnect) || (!dbus_connection_get_is_connected(conn))) {
DEBUG(3,("Connection is not open for dispatching.\n"));
/*
Expand Down Expand Up @@ -266,6 +301,8 @@ static void sbus_conn_wakeup_main(void *data)
}
}

static int sbus_add_connection_int(struct sbus_conn_ctx **dct_ctx);

/*
* integrate_connection_with_event_loop
* Set up a D-BUS connection to use the libevents mainloop
Expand All @@ -277,18 +314,39 @@ int sbus_add_connection(TALLOC_CTX *ctx,
struct sbus_conn_ctx **dct_ctx,
int connection_type)
{
dbus_bool_t dbret;
struct sbus_conn_ctx *dt_ctx;
int ret;

DEBUG(5,("Adding connection %lX\n", dbus_conn));
dt_ctx = talloc_zero(ctx, struct sbus_conn_ctx);

dt_ctx->ev = ev;
dt_ctx->conn = dbus_conn;
dt_ctx->connection_type = connection_type;
dt_ctx->disconnect = 0;

/* This will be replaced on the first call to sbus_conn_add_method_ctx() */
dt_ctx->method_ctx_list = NULL;

/* This can be overridden by a call to sbus_reconnect_init() */
dt_ctx->retries = 0;
dt_ctx->max_retries = 0;
dt_ctx->reconnect_callback = NULL;

ret = sbus_add_connection_int(&dt_ctx);
if (ret != EOK) {
talloc_free(dt_ctx);
return ret;
}
*dct_ctx = dt_ctx;
return EOK;
}

static int sbus_add_connection_int(struct sbus_conn_ctx **dct_ctx)
{
dbus_bool_t dbret;
struct sbus_conn_ctx *dt_ctx = *dct_ctx;

/*
* Set the default destructor
* Connections can override this with
Expand Down Expand Up @@ -368,6 +426,9 @@ int sbus_new_connection(TALLOC_CTX *ctx, struct tevent_context *ev,
/* FIXME: release resources */
}

/* Store the address for later reconnection */
(*dct_ctx)->address = talloc_strdup((*dct_ctx), address);

dbus_connection_set_exit_on_disconnect((*dct_ctx)->conn, FALSE);

/* Set connection destructor */
Expand Down Expand Up @@ -659,6 +720,133 @@ void *sbus_conn_get_private_data(struct sbus_conn_ctx *conn_ctx)
return conn_ctx->pvt_data;
}

static void sbus_reconnect(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval tv, void *data)
{
DBusConnection *dbus_conn;
DBusError dbus_error;
struct tevent_timer *event;
struct sbus_method_ctx *iter;
struct sbus_method_ctx *purge;
int ret;
struct sbus_conn_ctx *conn_ctx =
talloc_get_type(data, struct sbus_conn_ctx);

DEBUG(3, ("Making reconnection attempt %d to [%s]\n", conn_ctx->retries, conn_ctx->address));
/* Make a new connection to the D-BUS address */
dbus_error_init(&dbus_error);
dbus_conn = dbus_connection_open(conn_ctx->address, &dbus_error);
if (dbus_conn) {
/* We successfully reconnected. Set up mainloop
* integration.
*/
DEBUG(3, ("Reconnected to [%s]\n", conn_ctx->address));
conn_ctx->conn = dbus_conn;
ret = sbus_add_connection_int(&conn_ctx);
if (ret != EOK) {
dbus_connection_unref(dbus_conn);
goto failed;
}

/* Remove object paths (the reconnection callback must re-add these */
iter = conn_ctx->method_ctx_list;
while (iter != NULL) {
DLIST_REMOVE(conn_ctx->method_ctx_list, iter);
purge = iter;
iter = iter->next;
talloc_unlink(conn_ctx, purge);
}

/* Reset retries to 0 to resume dispatch processing */
conn_ctx->retries = 0;

/* Notify the owner of this connection that the
* reconnection was successful
*/
conn_ctx->reconnect_callback(conn_ctx,
SBUS_RECONNECT_SUCCESS,
conn_ctx->reconnect_pvt);
return;
}

failed:
/* Reconnection failed, try again in a few seconds */
DEBUG(1, ("Failed to open connection: name=%s, message=%s\n",
dbus_error.name, dbus_error.message));
if (dbus_error_is_set(&dbus_error)) dbus_error_free(&dbus_error);

conn_ctx->retries++;

/* Check if we've passed our last chance or if we've lost track of
* our retry count somehow
*/
if (((conn_ctx->max_retries > 0) &&
(conn_ctx->retries > conn_ctx->max_retries)) ||
conn_ctx->retries <= 0) {
conn_ctx->reconnect_callback(conn_ctx,
SBUS_RECONNECT_EXCEEDED_RETRIES,
conn_ctx->reconnect_pvt);
}

if (conn_ctx->retries == 2) {
tv.tv_sec += 3; /* Wait 3 seconds before the second reconnect attempt */
}
else if (conn_ctx->retries == 3) {
tv.tv_sec += 10; /* Wait 10 seconds before the third reconnect attempt */
}
else {
tv.tv_sec += 30; /* Wait 30 seconds before all subsequent reconnect attempts */
}
event = tevent_add_timer(conn_ctx->ev, conn_ctx, tv, sbus_reconnect, conn_ctx);
if (event == NULL) {
conn_ctx->reconnect_callback(conn_ctx,
SBUS_RECONNECT_ERROR,
conn_ctx->reconnect_pvt);
}
}

/* This function will free and recreate the sbus_conn_ctx,
* calling functions need to be aware of this (and whether
* they have attached a talloc destructor to the
* sbus_conn_ctx.
*/
static int sbus_auto_reconnect(struct sbus_conn_ctx *conn_ctx)
{
struct tevent_timer *te = NULL;
struct timeval tv;

conn_ctx->retries++;
if ((conn_ctx->max_retries > 0) &&
(conn_ctx->retries >= conn_ctx->max_retries)) {
/* Return EAGAIN (to tell the calling process it
* needs to create a new connection from scratch
*/
return EAGAIN;
}

gettimeofday(&tv, NULL);
tv.tv_sec += 1; /* Wait 1 second before the first reconnect attempt */
te = tevent_add_timer(conn_ctx->ev, conn_ctx, tv, sbus_reconnect, conn_ctx);
if (te == NULL) return EAGAIN;

return EOK;
}

/* Max retries */
void sbus_reconnect_init(struct sbus_conn_ctx *conn_ctx,
int max_retries,
sbus_conn_reconn_callback_fn callback,
void *pvt)
{
if(max_retries == 0 || callback == NULL) return;

conn_ctx->retries = 0;
conn_ctx->max_retries = max_retries;
conn_ctx->reconnect_callback = callback;
conn_ctx->reconnect_pvt = pvt;
}

bool sbus_conn_disconnecting(struct sbus_conn_ctx *conn_ctx)
{
if (conn_ctx->disconnect == 1) return true;
Expand Down

0 comments on commit 6a3ca34

Please sign in to comment.