Skip to content

Commit

Permalink
Merge pull request #250 from pspacek/tcp-64k
Browse files Browse the repository at this point in the history
64k TCP/DoT payload support
  • Loading branch information
jelu committed Aug 22, 2023
2 parents a540cca + 56e180c commit 92e74e1
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 56 deletions.
138 changes: 86 additions & 52 deletions src/dnsperf.c
Expand Up @@ -66,7 +66,7 @@

#define TIMEOUT_CHECK_TIME 100000

#define MAX_INPUT_DATA (64 * 1024)
#define MAX_INPUT_DATA (64 * 1024) + 2

#define MAX_SOCKETS 256

Expand Down Expand Up @@ -853,6 +853,40 @@ wait_for_start(void)
PERF_UNLOCK(&start_lock);
}

static inline void
bit_set(unsigned char* bits, unsigned int bit)
{
unsigned int shift, mask;

shift = 7 - (bit % 8);
mask = 1 << shift;

bits[bit / 8] |= mask;
}

static inline void
bit_clear(unsigned char* bits, unsigned int bit)
{
unsigned int shift, mask;

shift = 7 - (bit % 8);
mask = 1 << shift;

bits[bit / 8] &= ~mask;
}

static inline bool
bit_check(unsigned char* bits, unsigned int bit)
{
unsigned int shift;

shift = 7 - (bit % 8);

if ((bits[bit / 8] >> shift) & 0x01)
return true;
return false;
}

static void*
do_send(void* arg)
{
Expand All @@ -871,9 +905,10 @@ do_send(void* arg)
unsigned char packet_buffer[MAX_EDNS_PACKET];
unsigned char* base;
unsigned int length;
int n, i, any_inprogress = 0;
int n, i, any_inprogress = 0, sock = 0;
perf_result_t result;
bool all_fail;
unsigned char socketbits[(MAX_SOCKETS / 8) + 1] = {};

tinfo = (threadinfo_t*)arg;
config = tinfo->config;
Expand All @@ -900,6 +935,21 @@ do_send(void* arg)
now = perf_get_time();
}

/* Some sock might still be sending, try flush all of them */
if (any_inprogress) {
any_inprogress = 0;
for (i = 0; i < tinfo->nsocks; i++) {
if (!bit_check(socketbits, i)) {
continue;
}
if (!perf_net_sockready(tinfo->socks[i], threadpipe[0], TIMEOUT_CHECK_TIME)) {
any_inprogress = 1;
} else {
bit_clear(socketbits, i);
}
}
}

/* Rate limiting */
if (tinfo->max_qps > 0) {
/* the 1 second time slice where q_sent is calculated over */
Expand All @@ -910,36 +960,40 @@ do_send(void* arg)
}
/* limit QPS over the 1 second slice */
if (q_sent >= tinfo->max_qps) {
wait_us = q_slice - now;
if (config->qps_threshold_wait && wait_us > config->qps_threshold_wait) {
wait_us -= config->qps_threshold_wait;
struct timespec ts = { 0, 0 };
if (wait_us >= MILLION) {
ts.tv_sec = wait_us / MILLION;
ts.tv_nsec = (wait_us % MILLION) * 1000;
} else {
ts.tv_sec = 0;
ts.tv_nsec = wait_us * 1000;
if (!any_inprogress) { // only if nothing is in-progress
wait_us = q_slice - now;
if (config->qps_threshold_wait && wait_us > config->qps_threshold_wait) {
wait_us -= config->qps_threshold_wait;
struct timespec ts = { 0, 0 };
if (wait_us >= MILLION) {
ts.tv_sec = wait_us / MILLION;
ts.tv_nsec = (wait_us % MILLION) * 1000;
} else {
ts.tv_sec = 0;
ts.tv_nsec = wait_us * 1000;
}
nanosleep(&ts, NULL);
}
nanosleep(&ts, NULL);
}
now = perf_get_time();
continue;
}
/* handle stepping to the next window to send a query on */
if (req_time > now) {
wait_us = req_time - now;
if (config->qps_threshold_wait && wait_us > config->qps_threshold_wait) {
wait_us -= config->qps_threshold_wait;
struct timespec ts = { 0, 0 };
if (wait_us >= MILLION) {
ts.tv_sec = wait_us / MILLION;
ts.tv_nsec = (wait_us % MILLION) * 1000;
} else {
ts.tv_sec = 0;
ts.tv_nsec = wait_us * 1000;
if (!any_inprogress) { // only if nothing is in-progress
wait_us = req_time - now;
if (config->qps_threshold_wait && wait_us > config->qps_threshold_wait) {
wait_us -= config->qps_threshold_wait;
struct timespec ts = { 0, 0 };
if (wait_us >= MILLION) {
ts.tv_sec = wait_us / MILLION;
ts.tv_nsec = (wait_us % MILLION) * 1000;
} else {
ts.tv_sec = 0;
ts.tv_nsec = wait_us * 1000;
}
nanosleep(&ts, NULL);
}
nanosleep(&ts, NULL);
}
now = perf_get_time();
continue;
Expand All @@ -951,7 +1005,9 @@ do_send(void* arg)

/* Limit in-flight queries */
if (num_outstanding(stats) >= tinfo->max_outstanding) {
PERF_TIMEDWAIT(&tinfo->cond, &tinfo->lock, &times->stop_time_ns, NULL);
if (!any_inprogress) { // only if nothing is in-progress
PERF_TIMEDWAIT(&tinfo->cond, &tinfo->lock, &times->stop_time_ns, NULL);
}
PERF_UNLOCK(&tinfo->lock);
now = perf_get_time();
continue;
Expand All @@ -964,7 +1020,8 @@ do_send(void* arg)
i = tinfo->nsocks * 2;
all_fail = true;
while (i--) {
q->sock = tinfo->socks[tinfo->current_sock++ % tinfo->nsocks];
sock = tinfo->current_sock++ % tinfo->nsocks;
q->sock = tinfo->socks[sock];
switch (perf_net_sockready(q->sock, threadpipe[0], TIMEOUT_CHECK_TIME)) {
case 0:
if (config->verbose && !config->suppress.sockready) {
Expand Down Expand Up @@ -1060,6 +1117,7 @@ do_send(void* arg)
perf_log_warning("network congested, packet sending in progress");
}
any_inprogress = 1;
bit_set(socketbits, sock);
} else {
if (config->verbose && !config->suppress.sendfailed) {
char __s[256];
Expand Down Expand Up @@ -1185,29 +1243,6 @@ recv_one(threadinfo_t* tinfo, int which_sock,
return true;
}

static inline void
bit_set(unsigned char* bits, unsigned int bit)
{
unsigned int shift, mask;

shift = 7 - (bit % 8);
mask = 1 << shift;

bits[bit / 8] |= mask;
}

static inline bool
bit_check(unsigned char* bits, unsigned int bit)
{
unsigned int shift;

shift = 7 - (bit % 8);

if ((bits[bit / 8] >> shift) & 0x01)
return true;
return false;
}

static void*
do_recv(void* arg)
{
Expand All @@ -1217,7 +1252,7 @@ do_recv(void* arg)
received_query_t recvd[RECV_BATCH_SIZE] = { { 0, 0, 0, 0, 0, 0, false, false, 0 } };
unsigned int nrecvd;
int saved_errno;
unsigned char socketbits[MAX_SOCKETS / 8];
unsigned char socketbits[(MAX_SOCKETS / 8) + 1];
uint64_t now, latency;
query_info* q;
unsigned int current_socket, last_socket;
Expand Down Expand Up @@ -1640,8 +1675,7 @@ int main(int argc, char** argv)
perf_os_handlesignal(SIGINT, handle_sigint);
perf_os_blocksignal(SIGINT, false);
sock.fd = mainpipe[0];
result = perf_os_waituntilreadable(&sock, intrpipe[0],
times.stop_time - times.start_time);
result = perf_os_waituntilreadable(&sock, intrpipe[0], times.stop_time - times.start_time);
if (result == PERF_R_CANCELED)
interrupted = true;

Expand Down
4 changes: 2 additions & 2 deletions src/net.h
Expand Up @@ -28,8 +28,8 @@
#include <assert.h>
#include <stdbool.h>

#define TCP_RECV_BUF_SIZE (16 * 1024)
#define TCP_SEND_BUF_SIZE (4 * 1024)
#define TCP_RECV_BUF_SIZE (65535 + 2)
#define TCP_SEND_BUF_SIZE (65535 + 2)

struct perf_sockaddr {
union {
Expand Down
7 changes: 6 additions & 1 deletion src/net_tcp.c
Expand Up @@ -297,6 +297,9 @@ static int perf__tcp_sockready(struct perf_net_socket* sock, int pipe_fd, int64_
dnslen = ntohs(dnslen);
n = sendto(sock->fd, self->sendbuf + self->sending, dnslen + 2 - self->sending, 0, 0, 0);
if (n < 0) {
if (errno == EAGAIN) {
return 0;
}
int fd = perf__tcp_connect(sock), oldfd = ck_pr_load_int(&sock->fd);
ck_pr_store_int(&sock->fd, fd);
close(oldfd);
Expand Down Expand Up @@ -364,7 +367,9 @@ static int perf__tcp_sockready(struct perf_net_socket* sock, int pipe_fd, int64_
dnslen = ntohs(dnslen);
n = sendto(sock->fd, self->sendbuf + self->sending, dnslen + 2 - self->sending, 0, 0, 0);
if (n < 0) {
self->need_reconnect = true;
if (errno != EAGAIN) {
self->need_reconnect = true;
}
return 0;
}
self->sending += n;
Expand Down
2 changes: 1 addition & 1 deletion src/resperf.c
Expand Up @@ -61,7 +61,7 @@
#define DEFAULT_MAX_OUTSTANDING (64 * 1024)
#define DEFAULT_MAX_FALL_BEHIND 1000

#define MAX_INPUT_DATA (64 * 1024)
#define MAX_INPUT_DATA (64 * 1024) + 2

#define TIMEOUT_CHECK_TIME 5000000

Expand Down

0 comments on commit 92e74e1

Please sign in to comment.