Skip to content

Commit

Permalink
Promote BSOCK to a proper class implementation.
Browse files Browse the repository at this point in the history
First steps in abstracting network layer.

It would be nice to use other communication sockets then
only TCP ones as its done now. The whole socket abstraction needs
a major overhaul and some of the old bnet functions can be retired
now permanently and some functions are better written as methods
of the BSOCK class so we can write socket type specific implementations
of them.

This implements a BSOCK_TCP class which has the now used TCP sockets and
a dummy BSOCK_SCTP class which will eventually hold the SCTP sockets
and a dummy BSOCK_UDT class which will eventually hold the UDT sockets.
  • Loading branch information
Marco van Wieringen committed Dec 9, 2013
1 parent ca578b1 commit 49e41f5
Show file tree
Hide file tree
Showing 36 changed files with 2,075 additions and 1,209 deletions.
13 changes: 6 additions & 7 deletions src/console/console.c
Expand Up @@ -933,8 +933,6 @@ static bool select_director(CONFIG *config, const char *director, DIRRES **ret_d
BSOCK *UA_sock;
DIRRES *dir = NULL;
CONRES *cons = NULL;
struct sockaddr client_addr;
memset(&client_addr, 0, sizeof(client_addr));

*ret_cons = NULL;
*ret_dir = NULL;
Expand Down Expand Up @@ -969,7 +967,7 @@ static bool select_director(CONFIG *config, const char *director, DIRRES **ret_d
}

if (!dir) { /* prompt for director */
UA_sock = init_bsock(NULL, 0, "", "", 0, &client_addr);
UA_sock = New(BSOCK_TCP);
try_again:
sendit(_("Available Directors:\n"));
LockRes(config);
Expand All @@ -996,7 +994,7 @@ static bool select_director(CONFIG *config, const char *director, DIRRES **ret_d
senditf(_("You must enter a number between 1 and %d\n"), numdir);
goto try_again;
}
term_bsock(UA_sock);
delete UA_sock;
LockRes(config);
for (i=0; i<item; i++) {
dir = (DIRRES *)config->GetNextRes(R_DIRECTOR, (RES *)dir);
Expand Down Expand Up @@ -1266,9 +1264,10 @@ int main(int argc, char *argv[])
} else {
heart_beat = 0;
}
UA_sock = bnet_connect(NULL, 5, 15, heart_beat, "Director daemon", dir->address,
NULL, dir->DIRport, false);
if (UA_sock == NULL) {

UA_sock = New(BSOCK_TCP);
if (!UA_sock->connect(NULL, 5, 15, heart_beat, "Director daemon", dir->address, NULL, dir->DIRport, false)) {
delete UA_sock;
terminate_console(0);
return 1;
}
Expand Down
1 change: 1 addition & 0 deletions src/dird/backup.c
Expand Up @@ -563,6 +563,7 @@ bool do_native_backup(JCR *jcr)
if (jcr->file_bsock) {
jcr->file_bsock->signal(BNET_TERMINATE);
jcr->file_bsock->close();
delete jcr->file_bsock;
jcr->file_bsock = NULL;
}

Expand Down
12 changes: 9 additions & 3 deletions src/dird/fd_cmds.c
Expand Up @@ -92,10 +92,11 @@ extern DIRRES *director;

int connect_to_file_daemon(JCR *jcr, int retry_interval, int max_retry_time, bool verbose, bool start_job)
{
BSOCK *fd = new_bsock();
BSOCK *fd;
char ed1[30];
utime_t heart_beat;

fd = New(BSOCK_TCP);
if (jcr->res.client->heartbeat_interval) {
heart_beat = jcr->res.client->heartbeat_interval;
} else {
Expand All @@ -111,8 +112,8 @@ int connect_to_file_daemon(JCR *jcr, int retry_interval, int max_retry_time, boo
if (!fd->connect(jcr,retry_interval,max_retry_time, heart_beat, name,
jcr->res.client->address, NULL,
jcr->res.client->FDport, verbose)) {
fd->destroy();
fd = NULL;
delete fd;
fd = NULL;
}

if (fd == NULL) {
Expand Down Expand Up @@ -881,6 +882,7 @@ bool cancel_file_daemon_job(UAContext *ua, JCR *jcr)
}
fd->signal(BNET_TERMINATE);
fd->close();
delete ua->jcr->file_bsock;
ua->jcr->file_bsock = NULL;
jcr->file_bsock->set_terminated();
jcr->my_thread_send_signal(TIMEOUT_SIGNAL);
Expand Down Expand Up @@ -912,6 +914,7 @@ void do_native_client_status(UAContext *ua, CLIENTRES *client, char *cmd)
client->name());
if (ua->jcr->file_bsock) {
ua->jcr->file_bsock->close();
delete ua->jcr->file_bsock;
ua->jcr->file_bsock = NULL;
}
return;
Expand All @@ -931,6 +934,7 @@ void do_native_client_status(UAContext *ua, CLIENTRES *client, char *cmd)

fd->signal(BNET_TERMINATE);
fd->close();
delete ua->jcr->file_bsock;
ua->jcr->file_bsock = NULL;

return;
Expand Down Expand Up @@ -961,6 +965,7 @@ void do_client_resolve(UAContext *ua, CLIENTRES *client)
client->name());
if (ua->jcr->file_bsock) {
ua->jcr->file_bsock->close();
delete ua->jcr->file_bsock;
ua->jcr->file_bsock = NULL;
}
return;
Expand All @@ -982,6 +987,7 @@ void do_client_resolve(UAContext *ua, CLIENTRES *client)

fd->signal(BNET_TERMINATE);
fd->close();
delete ua->jcr->file_bsock;
ua->jcr->file_bsock = NULL;

return;
Expand Down
3 changes: 3 additions & 0 deletions src/dird/job.c
Expand Up @@ -1299,12 +1299,14 @@ void dird_free_jcr_pointers(JCR *jcr)
if (jcr->file_bsock) {
Dmsg0(200, "Close File bsock\n");
jcr->file_bsock->close();
delete jcr->file_bsock;
jcr->file_bsock = NULL;
}

if (jcr->store_bsock) {
Dmsg0(200, "Close Store bsock\n");
jcr->store_bsock->close();
delete jcr->store_bsock;
jcr->store_bsock = NULL;
}

Expand Down Expand Up @@ -1855,6 +1857,7 @@ bool select_next_rstore(JCR *jcr, bootstrap_info &info)
*/
if (jcr->store_bsock) {
jcr->store_bsock->close();
delete jcr->store_bsock;
jcr->store_bsock = NULL;
}

Expand Down
1 change: 1 addition & 0 deletions src/dird/msgchan.c
Expand Up @@ -521,6 +521,7 @@ extern "C" void *device_thread(void *arg)
}
UnlockRes();
jcr->store_bsock->close();
delete jcr->store_bsock;
jcr->store_bsock = NULL;
break;

Expand Down
1 change: 1 addition & 0 deletions src/dird/restore.c
Expand Up @@ -369,6 +369,7 @@ static inline bool do_native_restore_bootstrap(JCR *jcr)
if (jcr->file_bsock) {
jcr->file_bsock->signal(BNET_TERMINATE);
jcr->file_bsock->close();
delete jcr->file_bsock;
jcr->file_bsock = NULL;
}

Expand Down
17 changes: 15 additions & 2 deletions src/dird/sd_cmds.c
Expand Up @@ -73,14 +73,16 @@ static char changerdrivesresponse[] =
bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
int max_retry_time, bool verbose)
{
BSOCK *sd = new_bsock();
BSOCK *sd;
STORERES *store;
utime_t heart_beat;

if (jcr->store_bsock) {
return true; /* already connected */
}

sd = New(BSOCK_TCP);

/*
* If there is a write storage use it
*/
Expand All @@ -103,7 +105,7 @@ bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
sd->set_source_address(me->DIRsrc_addr);
if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
store->address, NULL, store->SDport, verbose)) {
sd->destroy();
delete sd;
sd = NULL;
}

Expand All @@ -115,6 +117,7 @@ bool connect_to_storage_daemon(JCR *jcr, int retry_interval,

if (!authenticate_storage_daemon(jcr, store)) {
sd->close();
delete jcr->store_bsock;
jcr->store_bsock = NULL;
return false;
}
Expand Down Expand Up @@ -148,6 +151,7 @@ void close_sd_bsock(UAContext *ua)
if (ua->jcr->store_bsock) {
ua->jcr->store_bsock->signal(BNET_TERMINATE);
ua->jcr->store_bsock->close();
delete ua->jcr->store_bsock;
ua->jcr->store_bsock = NULL;
}
}
Expand Down Expand Up @@ -629,6 +633,7 @@ bool cancel_storage_daemon_job(UAContext *ua, STORERES *store, char *JobId)
}
sd->signal(BNET_TERMINATE);
sd->close();
delete control_jcr->store_bsock;
control_jcr->store_bsock = NULL;

free_jcr(control_jcr);
Expand Down Expand Up @@ -677,6 +682,7 @@ bool cancel_storage_daemon_job(UAContext *ua, JCR *jcr, bool silent)
}
sd->signal(BNET_TERMINATE);
sd->close();
delete ua->jcr->store_bsock;
ua->jcr->store_bsock = NULL;
if (silent) {
jcr->sd_canceled = true;
Expand Down Expand Up @@ -743,23 +749,29 @@ void do_native_storage_status(UAContext *ua, STORERES *store, char *cmd)
store->name());
if (ua->jcr->store_bsock) {
ua->jcr->store_bsock->close();
delete ua->jcr->store_bsock;
ua->jcr->store_bsock = NULL;
}
return;
}

Dmsg0(20, _("Connected to storage daemon\n"));
sd = ua->jcr->store_bsock;
if (cmd) {
sd->fsend(dotstatuscmd, cmd);
} else {
sd->fsend(statuscmd);
}

while (sd->recv() >= 0) {
ua->send_msg("%s", sd->msg);
}

sd->signal( BNET_TERMINATE);
sd->close();
delete ua->jcr->store_bsock;
ua->jcr->store_bsock = NULL;

return;
}

Expand Down Expand Up @@ -893,6 +905,7 @@ bool do_storage_resolve(UAContext *ua, STORERES *store)

sd->signal(BNET_TERMINATE);
sd->close();
delete ua->jcr->store_bsock;
ua->jcr->store_bsock = NULL;

return true;
Expand Down
60 changes: 40 additions & 20 deletions src/dird/ua_cmds.c
Expand Up @@ -766,6 +766,7 @@ static inline int setbwlimit_filed(UAContext *ua, CLIENTRES *client,

ua->jcr->file_bsock->signal(BNET_TERMINATE);
ua->jcr->file_bsock->close();
delete ua->jcr->file_bsock;
ua->jcr->file_bsock = NULL;
ua->jcr->res.client = NULL;
ua->jcr->max_bandwidth = 0;
Expand Down Expand Up @@ -815,6 +816,7 @@ static inline int setbwlimit_stored(UAContext *ua, STORERES *store,

ua->jcr->store_bsock->signal(BNET_TERMINATE);
ua->jcr->store_bsock->close();
delete ua->jcr->store_bsock;
ua->jcr->store_bsock = NULL;
ua->jcr->res.wstore = NULL;
ua->jcr->max_bandwidth = 0;
Expand Down Expand Up @@ -985,57 +987,73 @@ static void do_storage_setdebug(UAContext *ua, STORERES *store, int level, int t
lstore.store = store;
pm_strcpy(lstore.store_source, _("unknown source"));
set_wstorage(jcr, &lstore);
/* Try connecting for up to 15 seconds */

/*
* Try connecting for up to 15 seconds
*/
ua->send_msg(_("Connecting to Storage daemon %s at %s:%d\n"),
store->name(), store->address, store->SDport);
store->name(), store->address, store->SDport);

if (!connect_to_storage_daemon(jcr, 1, 15, false)) {
ua->error_msg(_("Failed to connect to Storage daemon.\n"));
return;
}

Dmsg0(120, _("Connected to storage daemon\n"));
sd = jcr->store_bsock;
sd->fsend("setdebug=%d trace=%d\n", level, trace_flag);
if (sd->recv() >= 0) {
ua->send_msg("%s", sd->msg);
}

sd->signal(BNET_TERMINATE);
sd->close();
delete jcr->store_bsock;
jcr->store_bsock = NULL;

return;
}

/*
* For the client, we have the following values that can be set
* level = debug level
* trace = send debug output to a file
* hangup = how many records to send to SD before hanging up
* obviously this is most useful for testing restarting
* failed jobs.
* For the client, we have the following values that can be set :
* level = debug level
* trace = send debug output to a file
* hangup = how many records to send to SD before hanging up
* obviously this is most useful for testing restarting
* failed jobs.
*/
static void do_client_setdebug(UAContext *ua, CLIENTRES *client,
int level, int trace, int hangup)
static void do_client_setdebug(UAContext *ua, CLIENTRES *client, int level, int trace, int hangup)
{
BSOCK *fd;

/* Connect to File daemon */

/*
* Connect to File daemon
*/
ua->jcr->res.client = client;
/* Try to connect for 15 seconds */

/*
* Try to connect for 15 seconds
*/
ua->send_msg(_("Connecting to Client %s at %s:%d\n"),
client->name(), client->address, client->FDport);
client->name(), client->address, client->FDport);

if (!connect_to_file_daemon(ua->jcr, 1, 15, false, false)) {
ua->error_msg(_("Failed to connect to Client.\n"));
return;
}

Dmsg0(120, "Connected to file daemon\n");
fd = ua->jcr->file_bsock;
fd->fsend("setdebug=%d trace=%d hangup=%d\n", level, trace, hangup);
if (fd->recv() >= 0) {
ua->send_msg("%s", fd->msg);
}

fd->signal(BNET_TERMINATE);
fd->close();
delete ua->jcr->file_bsock;
ua->jcr->file_bsock = NULL;

return;
}

Expand Down Expand Up @@ -1532,7 +1550,9 @@ static int estimate_cmd(UAContext *ua, const char *cmd)
return 1;
}

/* The level string change if accurate mode is enabled */
/*
* The level string change if accurate mode is enabled
*/
if (accurate >= 0) {
jcr->accurate = accurate;
} else {
Expand All @@ -1554,8 +1574,7 @@ static int estimate_cmd(UAContext *ua, const char *cmd)
}

/*
* If the job is in accurate mode, we send the list of
* all files to FD.
* If the job is in accurate mode, we send the list of all files to FD.
*/
Dmsg1(40, "estimate accurate=%d\n", jcr->accurate);
if (!send_accurate_current_files(jcr)) {
Expand All @@ -1571,6 +1590,7 @@ static int estimate_cmd(UAContext *ua, const char *cmd)
if (jcr->file_bsock) {
jcr->file_bsock->signal(BNET_TERMINATE);
jcr->file_bsock->close();
delete jcr->file_bsock;
jcr->file_bsock = NULL;
}
return 1;
Expand Down Expand Up @@ -1604,9 +1624,9 @@ static int reload_cmd(UAContext *ua, const char *cmd)
/*
* Delete Pool records (should purge Media with it).
*
* delete pool=<pool-name>
* delete volume pool=<pool-name> volume=<name>
* delete jobid=<jobid>
* delete pool=<pool-name>
* delete volume pool=<pool-name> volume=<name>
* delete jobid=<jobid>
*/
static int delete_cmd(UAContext *ua, const char *cmd)
{
Expand Down

0 comments on commit 49e41f5

Please sign in to comment.