Skip to content

Commit

Permalink
Pullup the following (requested by jdolecek in ticket #1191):
Browse files Browse the repository at this point in the history
	sys/kern/kern_event.c	r1.110-1.115 (via patch)

fix a race in kqueue_scan() - when multiple threads check the same
kqueue, it could happen other thread seen empty kqueue while kevent
was being checked for re-firing and re-queued

make sure to keep retrying if there are outstanding kevents even
if no kevent is found on first pass through the queue, and only
kq_count when actually completely done with the kevent

PR kern/50094 by Christof Meerwal

Also fixes timer latency in Go, as reported in
golang/go#42515 by Michael Pratt
  • Loading branch information
MartinHusemann committed Feb 4, 2021
1 parent 676e668 commit 19e0c1a
Showing 1 changed file with 133 additions and 112 deletions.
245 changes: 133 additions & 112 deletions sys/kern/kern_event.c
@@ -1,4 +1,4 @@
/* $NetBSD: kern_event.c,v 1.104 2018/11/13 06:58:14 maxv Exp $ */
/* $NetBSD: kern_event.c,v 1.104.4.1 2021/02/04 16:57:25 martin Exp $ */

/*-
* Copyright (c) 2008, 2009 The NetBSD Foundation, Inc.
Expand Down Expand Up @@ -58,7 +58,7 @@
*/

#include <sys/cdefs.h>
__KERNEL_RCSID(0, "$NetBSD: kern_event.c,v 1.104 2018/11/13 06:58:14 maxv Exp $");
__KERNEL_RCSID(0, "$NetBSD: kern_event.c,v 1.104.4.1 2021/02/04 16:57:25 martin Exp $");

#include <sys/param.h>
#include <sys/systm.h>
Expand Down Expand Up @@ -166,6 +166,8 @@ static int kq_calloutmax = (4 * 1024);

extern const struct filterops sig_filtops;

#define KQ_FLUX_WAKEUP(kq) cv_broadcast(&kq->kq_cv)

/*
* Table for for all system-defined filters.
* These should be listed in the numeric order of the EVFILT_* defines.
Expand Down Expand Up @@ -1226,7 +1228,10 @@ kqueue_check(const char *func, size_t line, const struct kqueue *kq)
}
count++;
if (count > kq->kq_count) {
goto bad;
panic("%s,%zu: kq=%p kq->kq_count(%d) != "
"count(%d), nmarker=%d",
func, line, kq, kq->kq_count, count,
nmarker);
}
} else {
nmarker++;
Expand All @@ -1240,11 +1245,6 @@ kqueue_check(const char *func, size_t line, const struct kqueue *kq)
#endif
}
}
if (kq->kq_count != count) {
bad:
panic("%s,%zu: kq=%p kq->kq_count(%d) != count(%d), nmarker=%d",
func, line, kq, kq->kq_count, count, nmarker);
}
}
#define kq_check(a) kqueue_check(__func__, __LINE__, (a))
#else /* defined(DEBUG) */
Expand All @@ -1268,7 +1268,7 @@ kqueue_scan(file_t *fp, size_t maxevents, struct kevent *ulistp,
struct timespec ats, sleepts;
struct knote *kn, *marker, morker;
size_t count, nkev, nevents;
int timeout, error, rv;
int timeout, error, rv, influx;
filedesc_t *fdp;

fdp = curlwp->l_fd;
Expand Down Expand Up @@ -1317,119 +1317,140 @@ kqueue_scan(file_t *fp, size_t maxevents, struct kevent *ulistp,
}
}
mutex_spin_exit(&kq->kq_lock);
} else {
/* mark end of knote list */
TAILQ_INSERT_TAIL(&kq->kq_head, marker, kn_tqe);
goto done;
}

/*
* Acquire the fdp->fd_lock interlock to avoid races with
* file creation/destruction from other threads.
*/
mutex_spin_exit(&kq->kq_lock);
mutex_enter(&fdp->fd_lock);
mutex_spin_enter(&kq->kq_lock);
/* mark end of knote list */
TAILQ_INSERT_TAIL(&kq->kq_head, marker, kn_tqe);
influx = 0;

while (count != 0) {
kn = TAILQ_FIRST(&kq->kq_head); /* get next knote */
while ((kn->kn_status & KN_MARKER) != 0) {
if (kn == marker) {
/* it's our marker, stop */
TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
if (count < maxevents || (tsp != NULL &&
(timeout = gettimeleft(&ats,
&sleepts)) <= 0))
goto done;
mutex_exit(&fdp->fd_lock);
goto retry;
}
/* someone else's marker. */
kn = TAILQ_NEXT(kn, kn_tqe);
/*
* Acquire the fdp->fd_lock interlock to avoid races with
* file creation/destruction from other threads.
*/
relock:
mutex_spin_exit(&kq->kq_lock);
mutex_enter(&fdp->fd_lock);
mutex_spin_enter(&kq->kq_lock);

while (count != 0) {
kn = TAILQ_FIRST(&kq->kq_head); /* get next knote */

if ((kn->kn_status & KN_MARKER) != 0 && kn != marker) {
if (influx) {
influx = 0;
KQ_FLUX_WAKEUP(kq);
}
kq_check(kq);
mutex_exit(&fdp->fd_lock);
(void)cv_wait(&kq->kq_cv, &kq->kq_lock);
goto relock;
}

TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
if (kn == marker) {
/* it's our marker, stop */
KQ_FLUX_WAKEUP(kq);
if (count == maxevents) {
mutex_exit(&fdp->fd_lock);
goto retry;
}
break;
}
KASSERT((kn->kn_status & KN_BUSY) == 0);

kq_check(kq);
kn->kn_status &= ~KN_QUEUED;
kn->kn_status |= KN_BUSY;
kq_check(kq);
if (kn->kn_status & KN_DISABLED) {
kn->kn_status &= ~KN_BUSY;
kq->kq_count--;
TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
kn->kn_status &= ~KN_QUEUED;
kn->kn_status |= KN_BUSY;
kq_check(kq);
if (kn->kn_status & KN_DISABLED) {
/* don't want disabled events */
continue;
}
if ((kn->kn_flags & EV_ONESHOT) == 0) {
mutex_spin_exit(&kq->kq_lock);
KASSERT(kn->kn_fop != NULL);
KASSERT(kn->kn_fop->f_event != NULL);
KERNEL_LOCK(1, NULL); /* XXXSMP */
KASSERT(mutex_owned(&fdp->fd_lock));
rv = (*kn->kn_fop->f_event)(kn, 0);
KERNEL_UNLOCK_ONE(NULL); /* XXXSMP */
mutex_spin_enter(&kq->kq_lock);
/* Re-poll if note was re-enqueued. */
if ((kn->kn_status & KN_QUEUED) != 0) {
kn->kn_status &= ~KN_BUSY;
/* don't want disabled events */
/* Re-enqueue raised kq_count, lower it again */
kq->kq_count--;
influx = 1;
continue;
}
if ((kn->kn_flags & EV_ONESHOT) == 0) {
mutex_spin_exit(&kq->kq_lock);
KASSERT(kn->kn_fop != NULL);
KASSERT(kn->kn_fop->f_event != NULL);
KERNEL_LOCK(1, NULL); /* XXXSMP */
KASSERT(mutex_owned(&fdp->fd_lock));
rv = (*kn->kn_fop->f_event)(kn, 0);
KERNEL_UNLOCK_ONE(NULL); /* XXXSMP */
mutex_spin_enter(&kq->kq_lock);
/* Re-poll if note was re-enqueued. */
if ((kn->kn_status & KN_QUEUED) != 0) {
kn->kn_status &= ~KN_BUSY;
continue;
}
if (rv == 0) {
/*
* non-ONESHOT event that hasn't
* triggered again, so de-queue.
*/
kn->kn_status &= ~(KN_ACTIVE|KN_BUSY);
continue;
}
}
/* XXXAD should be got from f_event if !oneshot. */
*kevp++ = kn->kn_kevent;
nkev++;
if (kn->kn_flags & EV_ONESHOT) {
/* delete ONESHOT events after retrieval */
kn->kn_status &= ~KN_BUSY;
mutex_spin_exit(&kq->kq_lock);
knote_detach(kn, fdp, true);
mutex_enter(&fdp->fd_lock);
mutex_spin_enter(&kq->kq_lock);
} else if (kn->kn_flags & EV_CLEAR) {
/* clear state after retrieval */
kn->kn_data = 0;
kn->kn_fflags = 0;
kn->kn_status &= ~(KN_QUEUED|KN_ACTIVE|KN_BUSY);
} else if (kn->kn_flags & EV_DISPATCH) {
kn->kn_status |= KN_DISABLED;
kn->kn_status &= ~(KN_QUEUED|KN_ACTIVE|KN_BUSY);
} else {
/* add event back on list */
kq_check(kq);
kn->kn_status |= KN_QUEUED;
kn->kn_status &= ~KN_BUSY;
TAILQ_INSERT_TAIL(&kq->kq_head, kn, kn_tqe);
kq->kq_count++;
kq_check(kq);
}
if (nkev == kevcnt) {
/* do copyouts in kevcnt chunks */
mutex_spin_exit(&kq->kq_lock);
mutex_exit(&fdp->fd_lock);
error = (*keops->keo_put_events)
(keops->keo_private,
kevbuf, ulistp, nevents, nkev);
mutex_enter(&fdp->fd_lock);
mutex_spin_enter(&kq->kq_lock);
nevents += nkev;
nkev = 0;
kevp = kevbuf;
}
count--;
if (error != 0 || count == 0) {
/* remove marker */
TAILQ_REMOVE(&kq->kq_head, marker, kn_tqe);
break;
if (rv == 0) {
/*
* non-ONESHOT event that hasn't
* triggered again, so de-queue.
*/
kn->kn_status &= ~(KN_ACTIVE|KN_BUSY);
kq->kq_count--;
influx = 1;
continue;
}
}
done:
mutex_spin_exit(&kq->kq_lock);
mutex_exit(&fdp->fd_lock);
/* XXXAD should be got from f_event if !oneshot. */
*kevp++ = kn->kn_kevent;
nkev++;
if (kn->kn_flags & EV_ONESHOT) {
/* delete ONESHOT events after retrieval */
kn->kn_status &= ~KN_BUSY;
mutex_spin_exit(&kq->kq_lock);
knote_detach(kn, fdp, true);
mutex_enter(&fdp->fd_lock);
mutex_spin_enter(&kq->kq_lock);
} else if (kn->kn_flags & EV_CLEAR) {
/* clear state after retrieval */
kn->kn_data = 0;
kn->kn_fflags = 0;
kn->kn_status &= ~(KN_ACTIVE|KN_BUSY);
kq->kq_count--;
} else if (kn->kn_flags & EV_DISPATCH) {
kn->kn_status |= KN_DISABLED;
kn->kn_status &= ~(KN_ACTIVE|KN_BUSY);
kq->kq_count--;
} else {
/* add event back on list */
kq_check(kq);
kn->kn_status |= KN_QUEUED;
kn->kn_status &= ~KN_BUSY;
TAILQ_INSERT_TAIL(&kq->kq_head, kn, kn_tqe);
kq_check(kq);
}
if (nkev == kevcnt) {
/* do copyouts in kevcnt chunks */
influx = 0;
KQ_FLUX_WAKEUP(kq);
mutex_spin_exit(&kq->kq_lock);
mutex_exit(&fdp->fd_lock);
error = (*keops->keo_put_events)
(keops->keo_private,
kevbuf, ulistp, nevents, nkev);
mutex_enter(&fdp->fd_lock);
mutex_spin_enter(&kq->kq_lock);
nevents += nkev;
nkev = 0;
kevp = kevbuf;
}
count--;
if (error != 0 || count == 0) {
/* remove marker */
TAILQ_REMOVE(&kq->kq_head, marker, kn_tqe);
break;
}
}
KQ_FLUX_WAKEUP(kq);
mutex_spin_exit(&kq->kq_lock);
mutex_exit(&fdp->fd_lock);

done:
if (nkev != 0) {
/* copyout remaining events */
error = (*keops->keo_put_events)(keops->keo_private,
Expand Down

0 comments on commit 19e0c1a

Please sign in to comment.