Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions include/haproxy/buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ static inline void __b_put_varint(struct buffer *b, uint64_t v)
v = (v - 0xF0) >> 4;

while (1) {
if (tail++ == wrap)
if (++tail == wrap)
tail -= size;
data++;
if (v < 0x80)
Expand Down Expand Up @@ -798,7 +798,7 @@ static inline int b_put_varint(struct buffer *b, uint64_t v)
v = (v - 0xF0) >> 4;

while (1) {
if (tail++ == wrap)
if (++tail == wrap)
tail -= size;
data++;
if (data == size || v < 0x80)
Expand Down Expand Up @@ -837,7 +837,7 @@ static inline int b_get_varint(struct buffer *b, uint64_t *vptr)
v = *head;
bits += 4;
while (1) {
if (head++ == wrap)
if (++head == wrap)
head -= size;
data--;
if (!data || !(*head & 0x80))
Expand Down Expand Up @@ -879,7 +879,7 @@ static inline int b_peek_varint(struct buffer *b, size_t ofs, uint64_t *vptr)
v = *head;
bits += 4;
while (1) {
if (head++ == wrap)
if (++head == wrap)
head -= size;
data--;
if (!data || !(*head & 0x80))
Expand Down
12 changes: 12 additions & 0 deletions include/haproxy/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@
*/
#define LIST_INLIST(el) ((el)->n != (el))

/* atomically checks if the list element's next pointer points to anything
* different from itself, implying the element should be part of a list. This
* usually is similar to LIST_INLIST() except that while that one might be
* instrumented using debugging code to perform further consistency checks,
* the macro below guarantees to always perform a single atomic test and is
* safe to use with barriers.
*/
#define LIST_INLIST_ATOMIC(el) ({ \
typeof(el) __ptr = (el); \
HA_ATOMIC_LOAD(&(__ptr)->n) != __ptr; \
})

/* returns a pointer of type <pt> to a structure following the element
* which contains list head <lh>, which is known as element <el> in
* struct pt.
Expand Down
2 changes: 1 addition & 1 deletion include/haproxy/resolvers-t.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ struct resolv_answer_item {
uint16_t weight; /* SRV type weight */
uint16_t port; /* SRV type port */
uint16_t data_len; /* number of bytes in the <data> field below */
struct eb32_node link; /* linking node */
union {
struct sockaddr_in in4; /* IPv4 address for RTYPE_A */
struct sockaddr_in6 in6; /* IPv6 address for RTYPE_AAAA */
Expand All @@ -119,7 +120,6 @@ struct resolv_answer_item {
unsigned int last_seen; /* When was the answer was last seen */
struct resolv_answer_item *ar_item; /* pointer to a RRset from the additional section, if exists */
struct list attached_servers; /* attached server head */
struct eb32_node link; /* linking node */
};

struct resolv_response {
Expand Down
3 changes: 3 additions & 0 deletions include/haproxy/task-t.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
#define TASK_F_USR1 0x00010000 /* preserved user flag 1, application-specific, def:0 */
/* unused: 0x20000..0x80000000 */

/* These flags are persistent across scheduler calls */
#define TASK_PERSISTENT (TASK_SHARED_WQ | TASK_SELF_WAKING | TASK_KILLED | \
TASK_HEAVY | TASK_F_TASKLET | TASK_F_USR1)

struct notification {
struct list purge_me; /* Part of the list of signals to be purged in the
Expand Down
24 changes: 18 additions & 6 deletions src/dns.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,21 @@ ssize_t dns_recv_nameserver(struct dns_nameserver *ns, void *data, size_t size)

ds->rx_msg.len = 0;

/* This barrier is here to ensure that all data is
* stored if the appctx detect the elem is out of the
* list.
*/
__ha_barrier_store();

LIST_DEL_INIT(&ds->waiter);

if (ds->appctx) {
/* This second barrier is here to ensure that
* the waked up appctx won't miss that the elem
* is removed from the list.
*/
__ha_barrier_store();

/* awake appctx because it may have other
* message to receive
*/
Expand Down Expand Up @@ -618,10 +630,8 @@ static void dns_session_io_handler(struct appctx *appctx)
* delete from the list
*/

/* lock the dns_stream_server containing lists heads */
HA_SPIN_LOCK(DNS_LOCK, &ds->dss->lock);

if (!LIST_INLIST(&ds->waiter)) {
__ha_barrier_load();
if (!LIST_INLIST_ATOMIC(&ds->waiter)) {
while (1) {
uint16_t query_id;
struct eb32_node *eb;
Expand Down Expand Up @@ -700,9 +710,13 @@ static void dns_session_io_handler(struct appctx *appctx)
* wait_sess list where the task processing
* response will pop available responses
*/
HA_SPIN_LOCK(DNS_LOCK, &ds->dss->lock);

BUG_ON(LIST_INLIST(&ds->waiter));
LIST_APPEND(&ds->dss->wait_sess, &ds->waiter);

HA_SPIN_UNLOCK(DNS_LOCK, &ds->dss->lock);

/* awake the task processing the responses */
task_wakeup(ds->dss->task_rsp, TASK_WOKEN_INIT);

Expand All @@ -712,14 +726,12 @@ static void dns_session_io_handler(struct appctx *appctx)
if (!LIST_INLIST(&ds->waiter)) {
/* there is no more pending data to read and the con was closed by the server side */
if (!co_data(si_oc(si)) && (si_oc(si)->flags & CF_SHUTW)) {
HA_SPIN_UNLOCK(DNS_LOCK, &ds->dss->lock);
goto close;
}
}

}

HA_SPIN_UNLOCK(DNS_LOCK, &ds->dss->lock);
return;
close:
si_shutw(si);
Expand Down
17 changes: 10 additions & 7 deletions src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -526,19 +526,18 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
}

budgets[queue]--;
t = (struct task *)LIST_ELEM(tl_queues[queue].n, struct tasklet *, list);
state = t->state & (TASK_SHARED_WQ|TASK_SELF_WAKING|TASK_HEAVY|TASK_F_TASKLET|TASK_KILLED|TASK_F_USR1|TASK_KILLED);

th_ctx->flags &= ~TH_FL_STUCK; // this thread is still running
activity[tid].ctxsw++;

t = (struct task *)LIST_ELEM(tl_queues[queue].n, struct tasklet *, list);
ctx = t->context;
process = t->process;
t->calls++;
th_ctx->current = t;
th_ctx->flags &= ~TH_FL_STUCK; // this thread is still running

_HA_ATOMIC_DEC(&th_ctx->rq_total);

if (state & TASK_F_TASKLET) {
if (t->state & TASK_F_TASKLET) {
uint64_t before = 0;

LIST_DEL_INIT(&((struct tasklet *)t)->list);
Expand All @@ -555,7 +554,7 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
#endif
}

state = _HA_ATOMIC_XCHG(&t->state, state);
state = _HA_ATOMIC_FETCH_AND(&t->state, TASK_PERSISTENT);
__ha_barrier_atomic_store();

if (likely(!(state & TASK_KILLED))) {
Expand All @@ -582,7 +581,11 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])

LIST_DEL_INIT(&((struct tasklet *)t)->list);
__ha_barrier_store();
state = _HA_ATOMIC_XCHG(&t->state, state|TASK_RUNNING|TASK_F_USR1);

state = t->state;
while (!_HA_ATOMIC_CAS(&t->state, &state, (state & TASK_PERSISTENT) | TASK_RUNNING))
;

__ha_barrier_atomic_store();

/* OK then this is a regular task */
Expand Down