Skip to content

Commit

Permalink
add max concurrency parameter to grnslap.
Browse files Browse the repository at this point in the history
* src/grnslap.c: add max_con argument.
  • Loading branch information
daijiro committed Apr 26, 2009
1 parent de1fb9d commit f210435
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 12 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
2009-04-26 Poe MORITA <morita at razil.jp>

* src/grnslap.c: add max_con argument.

2009-04-25 Tasuku SUENAGA <a at razil.jp>

* configure.ac: release 0.0.3.
Expand Down
49 changes: 37 additions & 12 deletions src/grnslap.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@

#define DEFAULT_PORT 10041
#define DEFAULT_DEST "localhost"
#define DEFAULT_MAX_CONCURRENCY 10

static int port = DEFAULT_PORT;
static int proto = 'g';
static int max_con = DEFAULT_MAX_CONCURRENCY;

#include <stdarg.h>
static void
Expand Down Expand Up @@ -58,9 +60,10 @@ usage(void)
"Usage: grnslap [options...] [dest]\n"
"options:\n"
" -P <protocol>: http or gqtp (default: gqtp)\n"
" -m <max concurrency>: number of max concurrency (default: %d)\n"
" -p <port number>: server port number (default: %d)\n"
"dest: hostname (default: \"%s\")\n",
DEFAULT_PORT, DEFAULT_DEST);
DEFAULT_MAX_CONCURRENCY, DEFAULT_PORT, DEFAULT_DEST);
}

#define BUFSIZE 0x1000000
Expand All @@ -75,6 +78,7 @@ struct _session {
int stat;
int query_id;
int n_query;
int n_sessions;
};

static grn_com_event ev;
Expand All @@ -83,6 +87,9 @@ static grn_hash *sessions;
static int done = 0;
static int nsent = 0;
static int nrecv = 0;
static int etime_min = INT32_MAX;
static int etime_max = 0;
static int64_t etime_amount = 0;

static session *
session_open(grn_ctx *ctx, const char *dest, int port)
Expand Down Expand Up @@ -123,18 +130,24 @@ session_alloc(grn_ctx *ctx, const char *dest, int port)
static void
msg_handler(grn_ctx *ctx, grn_obj *msg)
{
uint32_t etime;
struct timeval tv;
grn_msg *m = (grn_msg *)msg;
grn_com *com = ((grn_msg *)msg)->peer;
session *s = com->opaque;
s->stat = 3;
/*
lprint(ctx, "%d %x %x %04d %04d %d (%d)", (int)ctx->rc,
m->header.proto, m->header.flags, s->n_query, s->query_id, com->fd,
(int)GRN_BULK_VSIZE(msg));
*/
gettimeofday(&tv, NULL);
etime = (tv.tv_sec - s->tv.tv_sec) * 1000000 + (tv.tv_usec - s->tv.tv_usec);
if (etime > etime_max) { etime_max = etime; }
if (etime < etime_min) { etime_min = etime; }
etime_amount += etime;
if (ctx->rc) { m->header.proto = 0; }
switch (m->header.proto) {
case GRN_COM_PROTO_GQTP :
if (GRN_BULK_VSIZE(msg) != 2) {
GRN_BULK_PUTC(ctx, msg, '\0');
lprint(ctx, "%8d(%4d) %8d : %s", s->query_id, s->n_sessions, etime, GRN_BULK_HEAD(msg));
}
if ((m->header.flags & GRN_QL_TAIL)) {
grn_com_queue_enque(ctx, &fsessions, (grn_com_queue_entry *)s);
nrecv++;
Expand Down Expand Up @@ -174,14 +187,13 @@ receiver(void *arg)
return NULL;
}

#define MAX_CON 100

static int
do_client(char *hostname)
{
int rc = -1;
char *buf;
grn_thread thread;
struct timeval tvb, tve;
grn_com_header sheader;
grn_ctx ctx_, *ctx = &ctx_;
grn_ctx_init(ctx, 0, GRN_ENC_DEFAULT);
Expand All @@ -199,14 +211,16 @@ do_client(char *hostname)
if (!grn_com_event_init(ctx, &ev, 1000, sizeof(grn_com))) {
ev.msg_handler = msg_handler;
if (!THREAD_CREATE(thread, receiver, NULL)) {
lprint(ctx, "begin: %d", nsent);
gettimeofday(&tvb, NULL);
lprint(ctx, "begin: max_concurrency=%d", max_con);
while (fgets(buf, BUFSIZE, stdin)) {
uint32_t size = strlen(buf) - 1;
session *s = session_alloc(ctx, hostname, port);
if (s) {
gettimeofday(&s->tv, NULL);
s->n_query++;
s->query_id = ++nsent;
s->n_sessions = (nsent - nrecv);
switch (proto) {
case 'H' :
case 'h' :
Expand All @@ -228,21 +242,29 @@ do_client(char *hostname)
} else {
fprintf(stderr, "grn_com_copen failed\n");
}
while ((nsent - nrecv) > MAX_CON) {
while ((nsent - nrecv) >= max_con) {
/* lprint(ctx, "s:%d r:%d", nsent, nrecv); */
usleep(1000);
}
if (!(nsent % 1000)) { lprint(ctx, " : %d", nsent); }
}
done = 1;
pthread_join(thread, NULL);
gettimeofday(&tve, NULL);
{
double qps;
uint64_t etime = (tve.tv_sec - tvb.tv_sec);
etime *= 1000000;
etime += (tve.tv_usec - tvb.tv_usec);
qps = (double)nsent * 1000000 / etime;
lprint(ctx, "end : n=%d min=%d max=%d avg=%d qps=%f etime=%d.%06d", nsent, etime_min, etime_max, (int)(etime_amount / nsent), qps, etime / 1000000, etime % 1000000);
}
{
session *s;
GRN_HASH_EACH(sessions, id, NULL, NULL, &s, {
session_close(ctx, s);
});
}
lprint(ctx, "end : %d", nsent);
rc = 0;
} else {
fprintf(stderr, "THREAD_CREATE failed\n");
Expand All @@ -266,19 +288,22 @@ enum {
int
main(int argc, char **argv)
{
char *portstr = NULL, *protostr = NULL;
char *portstr = NULL, *protostr = NULL, *maxconstr = NULL;
int r, i, mode = mode_client;
static grn_str_getopt_opt opts[] = {
{'p', NULL, NULL, 0, getopt_op_none},
{'P', NULL, NULL, 0, getopt_op_none},
{'m', NULL, NULL, 0, getopt_op_none},
{'h', NULL, NULL, mode_usage, getopt_op_update},
{'\0', NULL, NULL, 0, 0}
};
opts[0].arg = &portstr;
opts[1].arg = &protostr;
opts[2].arg = &maxconstr;
i = grn_str_getopt(argc, argv, opts, &mode);
if (portstr) { port = atoi(portstr); }
if (protostr) { proto = *protostr; }
if (maxconstr) { max_con = atoi(maxconstr); }
if (grn_init()) { return -1; }
switch (mode) {
case mode_client :
Expand Down

0 comments on commit f210435

Please sign in to comment.