Skip to content

Commit

Permalink
Add server view to dtop.
Browse files Browse the repository at this point in the history
  • Loading branch information
garlick committed Jun 3, 2011
1 parent 9bd5c8a commit c837f2a
Showing 1 changed file with 139 additions and 37 deletions.
176 changes: 139 additions & 37 deletions utils/dtop.c
Expand Up @@ -72,9 +72,16 @@ static const struct option longopts[] = {
#define GETOPT(ac,av,opt,lopt) getopt (ac,av,opt)
#endif

#ifndef MAXHOSTNAMELEN
#define MAXHOSTNAMELEN 64
#endif
#ifndef MAXPATHLEN
#define MAXPATHLEN 1024
#endif

typedef struct {
char host[17];
char aname[17];
char host[MAXHOSTNAMELEN];
char aname[MAXPATHLEN];
} Tpoolkey;

typedef struct {
Expand All @@ -92,15 +99,22 @@ typedef struct {
char *host;
double poll_sec;
pthread_t thread;
} Thread;
double rmbps;
double wmbps;
double iops;
double numfids;
double numreqs;
double totreqs;
} Server;

#define TOPWIN_LINES 7
#define SUBWIN

static List tpools = NULL;
static pthread_mutex_t tpools_lock = PTHREAD_MUTEX_INITIALIZER;
static List servers = NULL;
static pthread_mutex_t dtop_lock = PTHREAD_MUTEX_INITIALIZER;

static Thread *_start_reader (char *host, double poll_sec);
static Server *_server_create (char *host, double poll_sec);
static void _destroy_tpool (Tpool *tp);
static void _curses_watcher (double update_secs);

Expand All @@ -122,7 +136,6 @@ main (int argc, char *argv[])
{
hostlist_t hl = NULL;
hostlist_iterator_t itr;
List readers;
double poll_sec = 1;
char *host;
int c;
Expand Down Expand Up @@ -160,14 +173,14 @@ main (int argc, char *argv[])
if (!(tpools = list_create ((ListDelF)_destroy_tpool)))
err_exit ("out of memory");

/* Launch readers - create a list of Threads.
/* Launch readers - create a list of Servers.
*/
if (!(readers = list_create (NULL)))
if (!(servers = list_create (NULL)))
err_exit ("out of memory");
if (!(itr = hostlist_iterator_create (hl)))
err_exit ("out of memory");
while ((host = hostlist_next (itr))) {
if (!(list_append (readers, _start_reader (host, poll_sec))))
if (!(list_append (servers, _server_create (host, poll_sec))))
err_exit ("out of memory");
}
hostlist_iterator_destroy (itr);
Expand All @@ -194,7 +207,7 @@ _update_display_topwin (WINDOW *win)
ListIterator itr;
Tpool *tp;

pthread_mutex_lock (&tpools_lock);
pthread_mutex_lock (&dtop_lock);
if (!(itr = list_iterator_create (tpools)))
msg_exit ("out of memory");
while ((tp = list_next (itr))) {
Expand Down Expand Up @@ -227,7 +240,7 @@ _update_display_topwin (WINDOW *win)
wmbps += sample_rate (tp->wbytes, now) / (1024*1024);
}
list_iterator_destroy (itr);
pthread_mutex_unlock (&tpools_lock);
pthread_mutex_unlock (&dtop_lock);

wclear (win);
mvwprintw (win, y, 0, "%s", "DIOD - Distributed I/O Daemon");
Expand Down Expand Up @@ -257,7 +270,7 @@ _update_display_topwin (WINDOW *win)
}

static void
_update_display_subwin (WINDOW *win)
_update_display_normal (WINDOW *win)
{
ListIterator itr;
Tpool *tp;
Expand All @@ -269,41 +282,111 @@ _update_display_subwin (WINDOW *win)

wattron (win, A_REVERSE);
wprintw (win,
"%10.10s %14.14s %5.5s %5.5s %5.5s %5.5s %5.5s %5.5s %5.5s %5.5s %5.5s\n",
"%10.10s %14.14s %6.6s %5.5s %5.5s %5.5s %5.5s %5.5s\n",
"server", "aname", "fids", "reqs", "queue", "rMB/s", "wMB/s",
"IOPS", "ga/s", "wlk/s", "dir/s");
"IOPS");
wattroff (win, A_REVERSE);

pthread_mutex_lock (&tpools_lock);
pthread_mutex_lock (&dtop_lock);
if (!(itr = list_iterator_create (tpools)))
msg_exit ("out of memory");
while ((tp = list_next (itr))) {
if (sample_val (tp->numfids, t) == 0)
continue;
mvwprintw (win, y++, 0,
"%10.10s %14.14s %5.0f %5.0f %5.0f %5.0f %5.0f %5.0f %5.0f %5.0f %5.0f",
"%10.10s %14.14s %6.0f %5.0f %5.0f %5.0f %5.0f %5.0f",
tp->key.host, tp->key.aname,
sample_val (tp->numfids, t),
sample_rate (tp->totreqs, t),
sample_val (tp->numreqs, t),
sample_rate (tp->rbytes, t) / (1024*1024),
sample_rate (tp->wbytes, t) / (1024*1024),
sample_rate (tp->iops, t),
sample_rate (tp->nreqs[P9_TGETATTR], t),
sample_rate (tp->nreqs[P9_TWALK], t),
sample_rate (tp->nreqs[P9_TREADDIR], t));
sample_rate (tp->iops, t));
}
list_iterator_destroy (itr);
pthread_mutex_unlock (&tpools_lock);
pthread_mutex_unlock (&dtop_lock);
wrefresh (win);
}

static int
_match_serverhost (Server *sp, char *host)
{
if (!strcmp (sp->host, host))
return 1;
return 0;
}

static void
_update_display_server (WINDOW *win)
{
ListIterator itr;
Server *sp;
Tpool *tp;
time_t t = time (NULL);
int y = 0;

wclear (win);
wmove (win, y++, 0);

wattron (win, A_REVERSE);
wprintw (win, "%10.10s %14.14s %6.6s %5.5s %5.5s %5.5s %5.5s %5.5s\n",
"server", "", "fids", "reqs", "queue", "rMB/s", "wMB/s", "IOPS");
wattroff (win, A_REVERSE);

/* zero server stats */
if (!(itr = list_iterator_create (servers)))
msg_exit ("out of memory");
while ((sp = list_next (itr))) {
sp->numfids = sp->numreqs = sp->totreqs = 0;
sp->rmbps = sp->wmbps = sp->iops = 0;
}
list_iterator_destroy (itr);

if (!(itr = list_iterator_create (tpools)))
msg_exit ("out of memory");
while ((tp = list_next (itr))) {
sp = list_find_first (servers, (ListFindF)_match_serverhost,
tp->key.host);
if (sp) {
sp->numfids += sample_val (tp->numfids, t);
sp->totreqs += sample_rate (tp->totreqs, t);
sp->numreqs += sample_val (tp->numreqs, t); /* queue */
sp->rmbps += sample_rate (tp->rbytes, t) / (1024*1024);
sp->wmbps += sample_rate (tp->wbytes, t) / (1024*1024);
sp->iops += sample_rate (tp->iops, t);
}
}
list_iterator_destroy (itr);

/* iterate thru servers displaying data */
if (!(itr = list_iterator_create (servers)))
msg_exit ("out of memory");
while ((sp = list_next (itr))) {
mvwprintw (win, y++, 0,
"%10.10s %14.14s %6.0f %5.0f %5.0f %5.0f %5.0f %5.0f",
sp->host, "", sp->numfids, sp->totreqs, sp->numreqs,
sp->rmbps, sp->wmbps, sp->iops);
}
list_iterator_destroy (itr);

wrefresh (win);
}

static void
_update_display_aname (WINDOW *win)
{
wclear (win);
wrefresh (win);
}

typedef enum {VIEW_NORMAL, VIEW_SERVER, VIEW_ANAME} view_t;
static void
_curses_watcher (double update_secs)
{
WINDOW *topwin;
WINDOW *subwin;
int c;
view_t view = VIEW_NORMAL;

if (!(topwin = initscr ()))
err_exit ("error initializing parent window");
Expand All @@ -318,13 +401,32 @@ _curses_watcher (double update_secs)

while (!isendwin ()) {
_update_display_topwin (topwin);
_update_display_subwin (subwin);
switch (view) {
case VIEW_NORMAL:
_update_display_normal (subwin);
break;
case VIEW_ANAME:
_update_display_aname (subwin);
break;
case VIEW_SERVER:
_update_display_server (subwin);
break;
}
switch ((c = getch ())) {
case 'q':
case 0x03:
delwin (subwin);
endwin ();
break;
case 's': /* server view */
view = VIEW_SERVER;
break;
case 'a': /* aname view */
view = VIEW_ANAME;
break;
case 'n': /* normal view */
view = VIEW_NORMAL;
break;
case ERR: /* timeout */
break;
}
Expand Down Expand Up @@ -397,7 +499,7 @@ _update (char *host, time_t t, char *s)
snprintf (key.host, sizeof(key.host), "%s", host);
snprintf (key.aname, sizeof(key.aname), "%s", stats.name);

pthread_mutex_lock (&tpools_lock);
pthread_mutex_lock (&dtop_lock);
if (!(tp = list_find_first (tpools, (ListFindF)_match_tpool, &key))) {
tp = _create_tpool (&key);
if (!list_append (tpools, tp))
Expand All @@ -412,7 +514,7 @@ _update (char *host, time_t t, char *s)
sample_update (tp->totreqs, (double)_sum_nreqs(&stats), t);
for (i = 0; i < sizeof(tp->nreqs)/sizeof(tp->nreqs[0]); i++)
sample_update (tp->nreqs[i], (double)stats.nreqs[i], t);
pthread_mutex_unlock (&tpools_lock);
pthread_mutex_unlock (&dtop_lock);

if (stats.name)
free (stats.name);
Expand All @@ -421,7 +523,7 @@ _update (char *host, time_t t, char *s)
static void *
_reader (void *arg)
{
Thread *tp = (Thread *)arg;
Server *sp = (Server *)arg;
int fd = -1, len;
Npcfid *root = NULL;
char buf[8192], *s, *p;
Expand All @@ -430,7 +532,7 @@ _reader (void *arg)
for (;;) {
len = -1;
if (fd == -1)
fd = diod_sock_connect (tp->host, "564", DIOD_SOCK_QUIET);
fd = diod_sock_connect (sp->host, "564", DIOD_SOCK_QUIET);
if (fd != -1 && root == NULL)
root = npc_mount (fd, 65536, "ctl", diod_auth);
if (fd != -1 && root != NULL) {
Expand All @@ -443,7 +545,7 @@ _reader (void *arg)
p = strchr (s, '\n');
if (p)
*p++ = '\0';
_update (tp->host, now, s);
_update (sp->host, now, s);
}
}
if (fd != -1 && (root == NULL || len < 0)) {
Expand All @@ -457,26 +559,26 @@ _reader (void *arg)
fd = -1;
}
}
usleep ((useconds_t)(tp->poll_sec * 1E6));
usleep ((useconds_t)(sp->poll_sec * 1E6));
}
return NULL;
}

static Thread *
_start_reader (char *host, double poll_sec)
static Server *
_server_create (char *host, double poll_sec)
{
Thread *tp;
Server *sp;
int err;

if (!(tp = malloc (sizeof (Thread))))
if (!(sp = malloc (sizeof (Server))))
err_exit ("out of memory");
memset (tp, 0, sizeof (*tp));
tp->poll_sec = poll_sec;
if (!(tp->host = strdup (host)))
memset (sp, 0, sizeof (*sp));
sp->poll_sec = poll_sec;
if (!(sp->host = strdup (host)))
err_exit ("out of memory");
if ((err = pthread_create (&tp->thread, NULL, _reader, tp)))
if ((err = pthread_create (&sp->thread, NULL, _reader, sp)))
errn_exit (err, "pthread_create");
return tp;
return sp;
}

/*
Expand Down

0 comments on commit c837f2a

Please sign in to comment.