Skip to content

Commit

Permalink
kernel - usched_dfly revamp (8), add reschedule hints
Browse files Browse the repository at this point in the history
* Add reschedule hints when issuing a read() on a pipe or socket, or
  issuing a blocking kevent() call.

* usched_dfly will force a reschedule after the round-robin count has
  passed the half-way point if it detects a scheduling hint.  This is
  an attempt to avoid rescheduling in the middle of some critical user
  operation (e.g. postgres server holding internal locks).

* Add kern.usched_dfly.fast_resched which allows the scheduler to avoid
  interrupting a less desireable process with a more desireable process
  as long as the priority difference is not too great.

  However, default the value to 0, because setting the value has
  consequences for interactive responsiveness.

* When running pgbench we recommend leaving fast_resched disabled and
  instead running the pgbench at idprio 15 to work around issues where
  the postgres server process(es) get interrupted by the pgbench processes
  which causes the postgres server process(es) to hit internal lock conflicts
  more quickly and enter a semaphore wait more often (when both pgbench and
  the postgres servers are running on the same machine).

  This is really an issue with postgres server scaling.  Because the pgbench's
  use so much less cpu than the postgres server processes they are given a
  more desireable priority and thus can interrupt the postgres server
  processes.  We can't really 'fix' this in the scheduler without really
  messing up normal interactive responsiveness for the system.

  Example:

  idprio pgbench -j 80 -c 80 -T 60 -S bench
  • Loading branch information
Matthew Dillon committed Sep 26, 2012
1 parent e3e6be1 commit a3ef5f2
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 47 deletions.
5 changes: 4 additions & 1 deletion sys/kern/kern_event.c
Expand Up @@ -609,6 +609,10 @@ kern_kevent(struct kqueue *kq, int nevents, int *res, void *uap,
struct knote marker;
struct lwkt_token *tok;

if (tsp_in == NULL || tsp_in->tv_sec || tsp_in->tv_nsec)
atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);


tsp = tsp_in;
*res = 0;

Expand Down Expand Up @@ -784,7 +788,6 @@ sys_kevent(struct kevent_args *uap)
} else {
tsp = NULL;
}

fp = holdfp(p->p_fd, uap->fd, -1);
if (fp == NULL)
return (EBADF);
Expand Down
2 changes: 2 additions & 0 deletions sys/kern/sys_pipe.c
Expand Up @@ -394,6 +394,8 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
int bigread;
int bigcount;

atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);

if (uio->uio_resid == 0)
return(0);

Expand Down
2 changes: 2 additions & 0 deletions sys/kern/sys_socket.c
Expand Up @@ -76,6 +76,8 @@ soo_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
int error;
int msgflags;

atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);

so = (struct socket *)fp->f_data;

if (fflags & O_FBLOCKING)
Expand Down
141 changes: 95 additions & 46 deletions sys/kern/usched_dfly.c
Expand Up @@ -259,6 +259,7 @@ static int usched_dfly_weight1 = 200; /* keep thread on current cpu */
static int usched_dfly_weight2 = 180; /* synchronous peer's current cpu */
static int usched_dfly_weight3 = 40; /* number of threads on queue */
static int usched_dfly_weight4 = 160; /* availability of idle cores */
static int usched_dfly_fast_resched = 0;/* delta priority / resched */
static int usched_dfly_features = 0x8F; /* allow pulls */
static int usched_dfly_swmask = ~PPQMASK; /* allow pulls */
#endif
Expand Down Expand Up @@ -321,6 +322,11 @@ dfly_acquire_curproc(struct lwp *lp)
* uschedcp that isn't us and otherwise NULL it out.
*/
force_resched = 0;
if ((td->td_mpflags & TDF_MP_BATCH_DEMARC) &&
lp->lwp_rrcount >= usched_dfly_rrinterval / 2) {
force_resched = 1;
}

if (user_resched_wanted()) {
if (dd->uschedcp == lp)
force_resched = 1;
Expand Down Expand Up @@ -394,17 +400,22 @@ dfly_acquire_curproc(struct lwp *lp)
* push to another cpu. So don't steal on equal-priority even
* though it might appear to be more beneficial due to not
* having to switch back to the other thread's context.
*
* usched_dfly_fast_resched requires that two threads be
* significantly far apart in priority in order to interrupt.
*
* If better but not sufficiently far apart, the current
* uschedcp will be interrupted at the next scheduler clock.
*/
if (dd->uschedcp &&
(dd->upri & ~PPQMASK) >
(lp->lwp_priority & ~PPQMASK)) {
(lp->lwp_priority & ~PPQMASK) + usched_dfly_fast_resched) {
dd->uschedcp = lp;
dd->upri = lp->lwp_priority;
KKASSERT(lp->lwp_qcpu == dd->cpuid);
spin_unlock(&dd->spin);
break;
}

#ifdef SMP
/*
* We are not the current lwp, figure out the best cpu
Expand Down Expand Up @@ -700,31 +711,46 @@ dfly_setrunqueue_dd(dfly_pcpu_t rdd, struct lwp *lp)
spin_lock(&rdd->spin);
dfly_setrunqueue_locked(rdd, lp);

if (rgd == mycpu) {
if ((rdd->upri & ~PPQMASK) > (lp->lwp_priority & ~PPQMASK)) {
spin_unlock(&rdd->spin);
if (rdd->uschedcp == NULL) {
wakeup_mycpu(&rdd->helper_thread); /* XXX */
need_user_resched();
} else {
need_user_resched();
}
} else {
spin_unlock(&rdd->spin);
/*
* Potentially interrupt the currently-running thread
*/
if ((rdd->upri & ~PPQMASK) <= (lp->lwp_priority & ~PPQMASK)) {
/*
* Currently running thread is better or same, do not
* interrupt.
*/
spin_unlock(&rdd->spin);
} else if ((rdd->upri & ~PPQMASK) <= (lp->lwp_priority & ~PPQMASK) +
usched_dfly_fast_resched) {
/*
* Currently running thread is not better, but not so bad
* that we need to interrupt it. Let it run for one more
* scheduler tick.
*/
if (rdd->uschedcp &&
rdd->uschedcp->lwp_rrcount < usched_dfly_rrinterval) {
rdd->uschedcp->lwp_rrcount = usched_dfly_rrinterval - 1;
}
} else {
if ((rdd->upri & ~PPQMASK) > (lp->lwp_priority & ~PPQMASK)) {
spin_unlock(&rdd->spin);
lwkt_send_ipiq(rgd, dfly_need_user_resched_remote,
NULL);
} else if (dfly_rdyprocmask & rgd->gd_cpumask) {
atomic_clear_cpumask(&dfly_rdyprocmask,
rgd->gd_cpumask);
spin_unlock(&rdd->spin);
wakeup(&rdd->helper_thread);
spin_unlock(&rdd->spin);
} else if (rgd == mycpu) {
/*
* We should interrupt the currently running thread, which
* is on the current cpu.
*/
spin_unlock(&rdd->spin);
if (rdd->uschedcp == NULL) {
wakeup_mycpu(&rdd->helper_thread); /* XXX */
need_user_resched();
} else {
spin_unlock(&rdd->spin);
need_user_resched();
}
} else {
/*
* We should interrupt the currently running thread, which
* is on a different cpu.
*/
spin_unlock(&rdd->spin);
lwkt_send_ipiq(rgd, dfly_need_user_resched_remote, NULL);
}
#else
/*
Expand Down Expand Up @@ -1145,8 +1171,6 @@ dfly_resetpriority(struct lwp *lp)
spin_unlock(&rdd->spin);
need_user_resched();
} else {
atomic_clear_cpumask(&dfly_rdyprocmask,
CPUMASK(rcpu));
spin_unlock(&rdd->spin);
lwkt_send_ipiq(globaldata_find(rcpu),
dfly_need_user_resched_remote,
Expand Down Expand Up @@ -1802,10 +1826,25 @@ dfly_need_user_resched_remote(void *dummy)
globaldata_t gd = mycpu;
dfly_pcpu_t dd = &dfly_pcpu[gd->gd_cpuid];

/*
* Flag reschedule needed
*/
need_user_resched();

/* Call wakeup_mycpu to avoid sending IPIs to other CPUs */
wakeup_mycpu(&dd->helper_thread);
/*
* If no user thread is currently running we need to kick the helper
* on our cpu to recover. Otherwise the cpu will never schedule
* anything again.
*
* We cannot schedule the process ourselves because this is an
* IPI callback and we cannot acquire spinlocks in an IPI callback.
*
* Call wakeup_mycpu to avoid sending IPIs to other CPUs
*/
if (dd->uschedcp == NULL && (dfly_rdyprocmask & gd->gd_cpumask)) {
atomic_clear_cpumask(&dfly_rdyprocmask, gd->gd_cpumask);
wakeup_mycpu(&dd->helper_thread);
}
}

#endif
Expand Down Expand Up @@ -1916,28 +1955,32 @@ dfly_setrunqueue_locked(dfly_pcpu_t rdd, struct lwp *lp)
}

/*
* Add to the correct queue and set the appropriate bit. If no
* lower priority (i.e. better) processes are in the queue then
* we want a reschedule, calculate the best cpu for the job.
*
* Always run reschedules on the LWPs original cpu.
* Place us on the selected queue. Determine if we should be
* placed at the head of the queue or at the end.
*
* If the lp's rrcount has not been exhausted we want to resume with
* it when this queue is reached the next time, instead of resuming
* with a different lp. This improves cache effects and also avoids
* leaving interrupted MP servers out in the cold holding internal
* locks while trying to run a different thread.
* We are placed at the tail if our round-robin count has expired,
* or is about to expire and the system thinks its a good place to
* round-robin, or there is already a next thread on the queue
* (it might be trying to pick up where it left off and we don't
* want to interfere).
*/
KKASSERT((lp->lwp_mpflags & LWP_MP_ONRUNQ) == 0);
atomic_set_int(&lp->lwp_mpflags, LWP_MP_ONRUNQ);
++rdd->runqcount;
if (lp->lwp_rrcount >= usched_dfly_rrinterval) {

if (lp->lwp_rrcount >= usched_dfly_rrinterval ||
(lp->lwp_rrcount >= usched_dfly_rrinterval / 2 &&
(lp->lwp_thread->td_mpflags & TDF_MP_BATCH_DEMARC)) ||
!TAILQ_EMPTY(q)
) {
atomic_clear_int(&lp->lwp_thread->td_mpflags,
TDF_MP_BATCH_DEMARC);
lp->lwp_rrcount = 0;
TAILQ_INSERT_TAIL(q, lp, lwp_procq);
} else {
TAILQ_INSERT_HEAD(q, lp, lwp_procq);
if (TAILQ_NEXT(lp, lwp_procq) == NULL)
if (TAILQ_EMPTY(q))
lp->lwp_rrcount = 0;
TAILQ_INSERT_HEAD(q, lp, lwp_procq);
}
*which |= 1 << pri;
}
Expand Down Expand Up @@ -2252,31 +2295,37 @@ dfly_helper_thread_cpu_init(void)
SYSCTL_ADD_INT(&usched_dfly_sysctl_ctx,
SYSCTL_CHILDREN(usched_dfly_sysctl_tree),
OID_AUTO, "weight1", CTLFLAG_RW,
&usched_dfly_weight1, 10,
&usched_dfly_weight1, 200,
"Weight selection for current cpu");

SYSCTL_ADD_INT(&usched_dfly_sysctl_ctx,
SYSCTL_CHILDREN(usched_dfly_sysctl_tree),
OID_AUTO, "weight2", CTLFLAG_RW,
&usched_dfly_weight2, 5,
&usched_dfly_weight2, 180,
"Weight selection for wakefrom cpu");

SYSCTL_ADD_INT(&usched_dfly_sysctl_ctx,
SYSCTL_CHILDREN(usched_dfly_sysctl_tree),
OID_AUTO, "weight3", CTLFLAG_RW,
&usched_dfly_weight3, 50,
&usched_dfly_weight3, 40,
"Weight selection for num threads on queue");

SYSCTL_ADD_INT(&usched_dfly_sysctl_ctx,
SYSCTL_CHILDREN(usched_dfly_sysctl_tree),
OID_AUTO, "weight4", CTLFLAG_RW,
&usched_dfly_weight4, 50,
&usched_dfly_weight4, 160,
"Availability of other idle cpus");

SYSCTL_ADD_INT(&usched_dfly_sysctl_ctx,
SYSCTL_CHILDREN(usched_dfly_sysctl_tree),
OID_AUTO, "fast_resched", CTLFLAG_RW,
&usched_dfly_fast_resched, 0,
"Availability of other idle cpus");

SYSCTL_ADD_INT(&usched_dfly_sysctl_ctx,
SYSCTL_CHILDREN(usched_dfly_sysctl_tree),
OID_AUTO, "features", CTLFLAG_RW,
&usched_dfly_features, 15,
&usched_dfly_features, 0x8F,
"Allow pulls into empty queues");

SYSCTL_ADD_INT(&usched_dfly_sysctl_ctx,
Expand Down
1 change: 1 addition & 0 deletions sys/sys/thread.h
Expand Up @@ -380,6 +380,7 @@ struct thread {
#define TDF_MP_WAKEREQ 0x00000002 /* resume_kproc */
#define TDF_MP_EXITWAIT 0x00000004 /* reaper, see lwp_wait() */
#define TDF_MP_EXITSIG 0x00000008 /* reaper, see lwp_wait() */
#define TDF_MP_BATCH_DEMARC 0x00000010 /* batch mode handling */

/*
* Thread priorities. Typically only one thread from any given
Expand Down

0 comments on commit a3ef5f2

Please sign in to comment.