Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

10333 lines (8924 sloc) 272.652 kb
/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 1996-2012. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* %CopyrightEnd%
*/
#define ERL_PROCESS_C__
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <stddef.h> /* offsetof() */
#include "sys.h"
#include "erl_vm.h"
#include "global.h"
#include "erl_process.h"
#include "error.h"
#include "bif.h"
#include "erl_db.h"
#include "dist.h"
#include "beam_catches.h"
#include "erl_instrument.h"
#include "erl_threads.h"
#include "erl_binary.h"
#include "beam_bp.h"
#include "erl_cpu_topology.h"
#include "erl_thr_progress.h"
#include "erl_thr_queue.h"
#include "erl_async.h"
#include "dtrace-wrapper.h"
#define ERTS_DELAYED_WAKEUP_INFINITY (~(Uint64) 0)
#define ERTS_DELAYED_WAKEUP_REDUCTIONS ((Uint64) CONTEXT_REDS/2)
#define ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED (2000*CONTEXT_REDS)
#define ERTS_RUNQ_CALL_CHECK_BALANCE_REDS \
(ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED/2)
#define ERTS_PROC_MIN_CONTEXT_SWITCH_REDS_COST (CONTEXT_REDS/10)
#define ERTS_SCHED_SPIN_UNTIL_YIELD 100
#define ERTS_SCHED_SYS_SLEEP_SPINCOUNT_VERY_LONG 40
#define ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_VERY_LONG 1000
#define ERTS_SCHED_SYS_SLEEP_SPINCOUNT_LONG 20
#define ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_LONG 1000
#define ERTS_SCHED_SYS_SLEEP_SPINCOUNT_MEDIUM 10
#define ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_MEDIUM 1000
#define ERTS_SCHED_SYS_SLEEP_SPINCOUNT_SHORT 10
#define ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_SHORT 0
#define ERTS_SCHED_SYS_SLEEP_SPINCOUNT_VERY_SHORT 5
#define ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_VERY_SHORT 0
#define ERTS_SCHED_SYS_SLEEP_SPINCOUNT_NONE 0
#define ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_NONE 0
#define ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT 1000
#define ERTS_SCHED_SUSPEND_SLEEP_SPINCOUNT 0
#if 0 || defined(DEBUG)
#define ERTS_FAKE_SCHED_BIND_PRINT_SORTED_CPU_DATA
#endif
#if defined(DEBUG) && 0
#define HARDDEBUG
#else
#undef HARDDEBUG
#endif
#ifdef HARDDEBUG
#define HARDDEBUG_RUNQS
#endif
#ifdef HIPE
#include "hipe_mode_switch.h" /* for hipe_init_process() */
#include "hipe_signal.h" /* for hipe_thread_signal_init() */
#endif
#ifdef ERTS_ENABLE_LOCK_COUNT
#include "erl_lock_count.h"
#endif
#define MAX_BIT (1 << PRIORITY_MAX)
#define HIGH_BIT (1 << PRIORITY_HIGH)
#define NORMAL_BIT (1 << PRIORITY_NORMAL)
#define LOW_BIT (1 << PRIORITY_LOW)
#define ERTS_MAYBE_SAVE_TERMINATING_PROCESS(P) \
do { \
ERTS_SMP_LC_ASSERT(erts_lc_mtx_is_locked(&proc_tab_mtx)); \
if (saved_term_procs.end) \
save_terminating_process((P)); \
} while (0)
#define ERTS_EMPTY_RUNQ(RQ) \
((RQ)->len == 0 && (RQ)->misc.start == NULL)
#define ERTS_EMPTY_RUNQ_PORTS(RQ) \
((RQ)->ports.info.len == 0 && (RQ)->misc.start == NULL)
extern BeamInstr beam_apply[];
extern BeamInstr beam_exit[];
extern BeamInstr beam_continue_exit[];
static Sint p_last;
static Sint p_next;
static Sint p_serial;
static Uint p_serial_mask;
static Uint p_serial_shift;
int erts_sched_compact_load;
Uint erts_no_schedulers;
Uint erts_max_processes = ERTS_DEFAULT_MAX_PROCESSES;
Uint erts_process_tab_index_mask;
int erts_sched_thread_suggested_stack_size = -1;
#ifdef ERTS_ENABLE_LOCK_CHECK
ErtsLcPSDLocks erts_psd_required_locks[ERTS_PSD_SIZE];
#endif
static struct {
int aux_work;
int tse;
int sys_schedule;
} sched_busy_wait;
#ifdef ERTS_SMP
int erts_disable_proc_not_running_opt;
static ErtsAuxWorkData *aux_thread_aux_work_data;
#define ERTS_SCHDLR_SSPND_CHNG_WAITER (((erts_aint32_t) 1) << 0)
#define ERTS_SCHDLR_SSPND_CHNG_MSB (((erts_aint32_t) 1) << 1)
#define ERTS_SCHDLR_SSPND_CHNG_ONLN (((erts_aint32_t) 1) << 2)
#ifndef DEBUG
#define ERTS_SCHDLR_SSPND_CHNG_SET(VAL, OLD_VAL) \
erts_smp_atomic32_set_nob(&schdlr_sspnd.changing, (VAL))
#else
#define ERTS_SCHDLR_SSPND_CHNG_SET(VAL, OLD_VAL) \
do { \
erts_aint32_t old_val__; \
old_val__ = erts_smp_atomic32_xchg_nob(&schdlr_sspnd.changing, \
(VAL)); \
ASSERT(old_val__ == (OLD_VAL)); \
} while (0)
#endif
static struct {
erts_smp_mtx_t mtx;
erts_smp_cnd_t cnd;
int online;
int curr_online;
int wait_curr_online;
erts_smp_atomic32_t changing;
erts_smp_atomic32_t active;
struct {
int ongoing;
long wait_active;
ErtsProcList *procs;
} msb; /* Multi Scheduling Block */
} schdlr_sspnd;
static struct {
erts_smp_mtx_t update_mtx;
erts_smp_atomic32_t no_runqs;
int last_active_runqs;
int forced_check_balance;
erts_smp_atomic32_t checking_balance;
int halftime;
int full_reds_history_index;
struct {
int active_runqs;
int reds;
int max_len;
} prev_rise;
Uint n;
} balance_info;
#define ERTS_BLNCE_SAVE_RISE(ACTIVE, MAX_LEN, REDS) \
do { \
balance_info.prev_rise.active_runqs = (ACTIVE); \
balance_info.prev_rise.max_len = (MAX_LEN); \
balance_info.prev_rise.reds = (REDS); \
} while (0)
#endif
erts_sched_stat_t erts_sched_stat;
#ifdef USE_THREADS
static erts_tsd_key_t sched_data_key;
#endif
static erts_smp_mtx_t proc_tab_mtx;
static erts_smp_atomic32_t function_calls;
#ifdef ERTS_SMP
static erts_smp_atomic32_t doing_sys_schedule;
static erts_smp_atomic32_t no_empty_run_queues;
#else /* !ERTS_SMP */
ErtsSchedulerData *erts_scheduler_data;
#endif
ErtsAlignedRunQueue *erts_aligned_run_queues;
Uint erts_no_run_queues;
ErtsAlignedSchedulerData *erts_aligned_scheduler_data;
typedef union {
ErtsSchedulerSleepInfo ssi;
char align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsSchedulerSleepInfo))];
} ErtsAlignedSchedulerSleepInfo;
static ErtsAlignedSchedulerSleepInfo *aligned_sched_sleep_info;
Process** process_tab;
static Uint last_reductions;
static Uint last_exact_reductions;
Uint erts_default_process_flags;
Eterm erts_system_monitor;
Eterm erts_system_monitor_long_gc;
Eterm erts_system_monitor_large_heap;
struct erts_system_monitor_flags_t erts_system_monitor_flags;
/* system performance monitor */
Eterm erts_system_profile;
struct erts_system_profile_flags_t erts_system_profile_flags;
#if ERTS_MAX_PROCESSES > 0x7fffffff
#error "Need to store process_count in another type"
#endif
static erts_smp_atomic32_t process_count;
typedef struct ErtsTermProcElement_ ErtsTermProcElement;
struct ErtsTermProcElement_ {
ErtsTermProcElement *next;
ErtsTermProcElement *prev;
int ix;
union {
struct {
Eterm pid;
SysTimeval spawned;
SysTimeval exited;
} process;
struct {
SysTimeval time;
} bif_invocation;
} u;
};
static struct {
ErtsTermProcElement *start;
ErtsTermProcElement *end;
} saved_term_procs;
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_op_list,
ErtsMiscOpList,
10,
ERTS_ALC_T_MISC_OP_LIST)
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(proclist,
ErtsProcList,
200,
ERTS_ALC_T_PROC_LIST)
#define ERTS_SCHED_SLEEP_INFO_IX(IX) \
(ASSERT_EXPR(-1 <= ((int) (IX)) \
&& ((int) (IX)) < ((int) erts_no_schedulers)), \
&aligned_sched_sleep_info[(IX)].ssi)
#define ERTS_FOREACH_RUNQ(RQVAR, DO) \
do { \
ErtsRunQueue *RQVAR; \
int ix__; \
for (ix__ = 0; ix__ < erts_no_run_queues; ix__++) { \
RQVAR = ERTS_RUNQ_IX(ix__); \
erts_smp_runq_lock(RQVAR); \
{ DO; } \
erts_smp_runq_unlock(RQVAR); \
} \
} while (0)
#define ERTS_FOREACH_OP_RUNQ(RQVAR, DO) \
do { \
ErtsRunQueue *RQVAR; \
int ix__; \
ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&schdlr_sspnd.mtx)); \
for (ix__ = 0; ix__ < schdlr_sspnd.online; ix__++) { \
RQVAR = ERTS_RUNQ_IX(ix__); \
erts_smp_runq_lock(RQVAR); \
{ DO; } \
erts_smp_runq_unlock(RQVAR); \
} \
} while (0)
#define ERTS_ATOMIC_FOREACH_RUNQ_X(RQVAR, DO, DOX) \
do { \
ErtsRunQueue *RQVAR; \
int ix__; \
for (ix__ = 0; ix__ < erts_no_run_queues; ix__++) { \
RQVAR = ERTS_RUNQ_IX(ix__); \
erts_smp_runq_lock(RQVAR); \
{ DO; } \
} \
{ DOX; } \
for (ix__ = 0; ix__ < erts_no_run_queues; ix__++) \
erts_smp_runq_unlock(ERTS_RUNQ_IX(ix__)); \
} while (0)
#define ERTS_ATOMIC_FOREACH_RUNQ(RQVAR, DO) \
ERTS_ATOMIC_FOREACH_RUNQ_X(RQVAR, DO, )
/*
* Local functions.
*/
static void init_processes_bif(void);
static void save_terminating_process(Process *p);
static void exec_misc_ops(ErtsRunQueue *);
static void print_function_from_pc(int to, void *to_arg, BeamInstr* x);
static int stack_element_dump(int to, void *to_arg, Process* p, Eterm* sp,
int yreg);
static void aux_work_timeout(void *unused);
static void aux_work_timeout_early_init(int no_schedulers);
static void aux_work_timeout_late_init(void);
static void setup_aux_work_timer(void);
#if defined(DEBUG) || 0
#define ERTS_DBG_CHK_AUX_WORK_VAL(V) dbg_chk_aux_work_val((V))
static void
dbg_chk_aux_work_val(erts_aint32_t value)
{
erts_aint32_t valid = 0;
valid |= ERTS_SSI_AUX_WORK_SET_TMO;
valid |= ERTS_SSI_AUX_WORK_MISC;
valid |= ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM;
valid |= ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC;
#if ERTS_USE_ASYNC_READY_Q
valid |= ERTS_SSI_AUX_WORK_ASYNC_READY;
valid |= ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
#endif
#ifdef ERTS_SMP
valid |= ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP;
valid |= ERTS_SSI_AUX_WORK_MISC_THR_PRGR;
valid |= ERTS_SSI_AUX_WORK_DD;
valid |= ERTS_SSI_AUX_WORK_DD_THR_PRGR;
#endif
#if HAVE_ERTS_MSEG
valid |= ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK;
#endif
#ifdef ERTS_SMP_SCHEDULERS_NEED_TO_CHECK_CHILDREN
valid |= ERTS_SSI_AUX_WORK_CHECK_CHILDREN;
#endif
#ifdef ERTS_SSI_AUX_WORK_REAP_PORTS
valid |= ERTS_SSI_AUX_WORK_REAP_PORTS;
#endif
if (~valid & value)
erl_exit(ERTS_ABORT_EXIT,
"Invalid aux_work value found: 0x%x\n",
~valid & value);
}
#define ERTS_DBG_CHK_SSI_AUX_WORK(SSI) \
ERTS_DBG_CHK_AUX_WORK_VAL(erts_atomic32_read_nob(&(SSI)->aux_work))
#else
#define ERTS_DBG_CHK_AUX_WORK_VAL(V)
#define ERTS_DBG_CHK_SSI_AUX_WORK(SSI)
#endif
#ifdef ERTS_SMP
static void handle_pending_exiters(ErtsProcList *);
#endif
#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
int
erts_smp_lc_runq_is_locked(ErtsRunQueue *runq)
{
return erts_smp_lc_mtx_is_locked(&runq->mtx);
}
#endif
void
erts_pre_init_process(void)
{
#ifdef USE_THREADS
erts_tsd_key_create(&sched_data_key);
#endif
#ifdef ERTS_ENABLE_LOCK_CHECK
{
int ix;
erts_psd_required_locks[ERTS_PSD_ERROR_HANDLER].get_locks
= ERTS_PSD_ERROR_HANDLER_BUF_GET_LOCKS;
erts_psd_required_locks[ERTS_PSD_ERROR_HANDLER].set_locks
= ERTS_PSD_ERROR_HANDLER_BUF_SET_LOCKS;
erts_psd_required_locks[ERTS_PSD_SAVED_CALLS_BUF].get_locks
= ERTS_PSD_SAVED_CALLS_BUF_GET_LOCKS;
erts_psd_required_locks[ERTS_PSD_SAVED_CALLS_BUF].set_locks
= ERTS_PSD_SAVED_CALLS_BUF_SET_LOCKS;
erts_psd_required_locks[ERTS_PSD_SCHED_ID].get_locks
= ERTS_PSD_SCHED_ID_GET_LOCKS;
erts_psd_required_locks[ERTS_PSD_SCHED_ID].set_locks
= ERTS_PSD_SCHED_ID_SET_LOCKS;
erts_psd_required_locks[ERTS_PSD_DIST_ENTRY].get_locks
= ERTS_PSD_DIST_ENTRY_GET_LOCKS;
erts_psd_required_locks[ERTS_PSD_DIST_ENTRY].set_locks
= ERTS_PSD_DIST_ENTRY_SET_LOCKS;
erts_psd_required_locks[ERTS_PSD_CALL_TIME_BP].get_locks
= ERTS_PSD_CALL_TIME_BP_GET_LOCKS;
erts_psd_required_locks[ERTS_PSD_CALL_TIME_BP].set_locks
= ERTS_PSD_CALL_TIME_BP_SET_LOCKS;
/* Check that we have locks for all entries */
for (ix = 0; ix < ERTS_PSD_SIZE; ix++) {
ERTS_SMP_LC_ASSERT(erts_psd_required_locks[ix].get_locks);
ERTS_SMP_LC_ASSERT(erts_psd_required_locks[ix].set_locks);
}
}
#endif
}
/* initialize the scheduler */
void
erts_init_process(int ncpu)
{
Uint proc_bits = ERTS_PROC_BITS;
#ifdef ERTS_SMP
erts_disable_proc_not_running_opt = 0;
erts_init_proc_lock(ncpu);
#endif
init_proclist_alloc();
erts_smp_atomic32_init_nob(&process_count, 0);
if (erts_use_r9_pids_ports) {
proc_bits = ERTS_R9_PROC_BITS;
ASSERT(erts_max_processes <= (1 << ERTS_R9_PROC_BITS));
}
process_tab = (Process**) erts_alloc(ERTS_ALC_T_PROC_TABLE,
erts_max_processes*sizeof(Process*));
sys_memzero(process_tab, erts_max_processes * sizeof(Process*));
erts_smp_mtx_init(&proc_tab_mtx, "proc_tab");
p_last = -1;
p_next = 0;
p_serial = 0;
p_serial_shift = erts_fit_in_bits(erts_max_processes - 1);
p_serial_mask = ((~(~((Uint) 0) << proc_bits)) >> p_serial_shift);
erts_process_tab_index_mask = ~(~((Uint) 0) << p_serial_shift);
last_reductions = 0;
last_exact_reductions = 0;
erts_default_process_flags = 0;
}
void
erts_late_init_process(void)
{
int ix;
init_processes_bif();
erts_smp_spinlock_init(&erts_sched_stat.lock, "sched_stat");
for (ix = 0; ix < ERTS_NO_PRIO_LEVELS; ix++) {
Eterm atom;
char *atom_str;
switch (ix) {
case PRIORITY_MAX:
atom_str = "process_max";
break;
case PRIORITY_HIGH:
atom_str = "process_high";
break;
case PRIORITY_NORMAL:
atom_str = "process_normal";
break;
case PRIORITY_LOW:
atom_str = "process_low";
break;
case ERTS_PORT_PRIO_LEVEL:
atom_str = "port";
break;
default:
atom_str = "bad_prio";
ASSERT(!"bad prio");
break;
}
atom = am_atom_put(atom_str, sys_strlen(atom_str));
erts_sched_stat.prio[ix].name = atom;
erts_sched_stat.prio[ix].total_executed = 0;
erts_sched_stat.prio[ix].executed = 0;
erts_sched_stat.prio[ix].total_migrated = 0;
erts_sched_stat.prio[ix].migrated = 0;
}
}
static void
init_sched_wall_time(ErtsSchedWallTime *swtp)
{
swtp->enabled = 0;
swtp->start = 0;
swtp->working.total = 0;
swtp->working.start = 0;
swtp->working.currently = 0;
}
static ERTS_INLINE Uint64
sched_wall_time_ts(void)
{
#ifdef HAVE_GETHRTIME
return (Uint64) sys_gethrtime();
#else
Uint64 res;
SysTimeval tv;
sys_gettimeofday(&tv);
res = (Uint64) tv.tv_sec*1000000;
res += (Uint64) tv.tv_usec;
return res;
#endif
}
static ERTS_INLINE void
sched_wall_time_change(ErtsSchedulerData *esdp, int working)
{
if (esdp->sched_wall_time.enabled) {
Uint64 ts = sched_wall_time_ts();
if (working) {
#ifdef DEBUG
ASSERT(!esdp->sched_wall_time.working.currently);
esdp->sched_wall_time.working.currently = 1;
#endif
ts -= esdp->sched_wall_time.start;
esdp->sched_wall_time.working.start = ts;
}
else {
#ifdef DEBUG
ASSERT(esdp->sched_wall_time.working.currently);
esdp->sched_wall_time.working.currently = 0;
#endif
ts -= esdp->sched_wall_time.start;
ts -= esdp->sched_wall_time.working.start;
esdp->sched_wall_time.working.total += ts;
}
}
}
typedef struct {
int set;
int enable;
Process *proc;
Eterm ref;
Eterm ref_heap[REF_THING_SIZE];
Uint req_sched;
erts_smp_atomic32_t refc;
} ErtsSchedWallTimeReq;
#if !HALFWORD_HEAP
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(swtreq,
ErtsSchedWallTimeReq,
5,
ERTS_ALC_T_SCHED_WTIME_REQ)
#else
static ERTS_INLINE ErtsSchedWallTimeReq *
swtreq_alloc(void)
{
return erts_alloc(ERTS_ALC_T_SCHED_WTIME_REQ,
sizeof(ErtsSchedWallTimeReq));
}
static ERTS_INLINE void
swtreq_free(ErtsSchedWallTimeReq *ptr)
{
erts_free(ERTS_ALC_T_SCHED_WTIME_REQ, ptr);
}
#endif
static void
reply_sched_wall_time(void *vswtrp)
{
Uint64 working = 0, total = 0;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ErtsSchedWallTimeReq *swtrp = (ErtsSchedWallTimeReq *) vswtrp;
ErtsProcLocks rp_locks = (swtrp->req_sched == esdp->no
? ERTS_PROC_LOCK_MAIN
: 0);
Process *rp = swtrp->proc;
Eterm ref_copy = NIL, msg;
Eterm *hp = NULL;
Eterm **hpp;
Uint sz, *szp;
ErlOffHeap *ohp = NULL;
ErlHeapFragment *bp = NULL;
ASSERT(esdp);
if (swtrp->set) {
if (!swtrp->enable && esdp->sched_wall_time.enabled)
esdp->sched_wall_time.enabled = 0;
else if (swtrp->enable && !esdp->sched_wall_time.enabled) {
Uint64 ts = sched_wall_time_ts();
esdp->sched_wall_time.enabled = 1;
esdp->sched_wall_time.start = ts;
esdp->sched_wall_time.working.total = 0;
esdp->sched_wall_time.working.start = 0;
esdp->sched_wall_time.working.currently = 1;
}
}
if (esdp->sched_wall_time.enabled) {
Uint64 ts = sched_wall_time_ts();
ASSERT(esdp->sched_wall_time.working.currently);
ts -= esdp->sched_wall_time.start;
total = ts;
ts -= esdp->sched_wall_time.working.start;
working = esdp->sched_wall_time.working.total + ts;
}
sz = 0;
hpp = NULL;
szp = &sz;
while (1) {
if (hpp)
ref_copy = STORE_NC(hpp, ohp, swtrp->ref);
else
*szp += REF_THING_SIZE;
if (swtrp->set)
msg = ref_copy;
else {
msg = (!esdp->sched_wall_time.enabled
? am_notsup
: erts_bld_tuple(hpp, szp, 3,
make_small(esdp->no),
erts_bld_uint64(hpp, szp, working),
erts_bld_uint64(hpp, szp, total)));
msg = erts_bld_tuple(hpp, szp, 2, ref_copy, msg);
}
if (hpp)
break;
hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, &rp_locks);
szp = NULL;
hpp = &hp;
}
erts_queue_message(rp, &rp_locks, bp, msg, NIL
#ifdef USE_VM_PROBES
, NIL
#endif
);
if (swtrp->req_sched == esdp->no)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
erts_smp_proc_dec_refc(rp);
if (erts_smp_atomic32_dec_read_nob(&swtrp->refc) == 0)
swtreq_free(vswtrp);
}
Eterm
erts_sched_wall_time_request(Process *c_p, int set, int enable)
{
ErtsSchedulerData *esdp = ERTS_PROC_GET_SCHDATA(c_p);
Eterm ref;
ErtsSchedWallTimeReq *swtrp;
Eterm *hp;
if (!set && !esdp->sched_wall_time.enabled)
return THE_NON_VALUE;
swtrp = swtreq_alloc();
ref = erts_make_ref(c_p);
hp = &swtrp->ref_heap[0];
swtrp->set = set;
swtrp->enable = enable;
swtrp->proc = c_p;
swtrp->ref = STORE_NC(&hp, NULL, ref);
swtrp->req_sched = esdp->no;
erts_smp_atomic32_init_nob(&swtrp->refc,
(erts_aint32_t) erts_no_schedulers);
erts_smp_proc_add_refc(c_p, (Sint32) erts_no_schedulers);
#ifdef ERTS_SMP
if (erts_no_schedulers > 1)
erts_schedule_multi_misc_aux_work(1,
erts_no_schedulers,
reply_sched_wall_time,
(void *) swtrp);
#endif
reply_sched_wall_time((void *) swtrp);
return ref;
}
static ERTS_INLINE ErtsProcList *
proclist_create(Process *p)
{
ErtsProcList *plp = proclist_alloc();
plp->pid = p->id;
plp->started = p->started;
return plp;
}
static ERTS_INLINE void
proclist_destroy(ErtsProcList *plp)
{
proclist_free(plp);
}
static ERTS_INLINE int
proclist_same(ErtsProcList *plp, Process *p)
{
return (plp->pid == p->id
&& erts_cmp_timeval(&plp->started, &p->started) == 0);
}
ErtsProcList *
erts_proclist_create(Process *p)
{
return proclist_create(p);
}
void
erts_proclist_destroy(ErtsProcList *plp)
{
proclist_destroy(plp);
}
int
erts_proclist_same(ErtsProcList *plp, Process *p)
{
return proclist_same(plp, p);
}
void *
erts_psd_set_init(Process *p, ErtsProcLocks plocks, int ix, void *data)
{
void *old;
ErtsProcLocks xplocks;
int refc = 0;
ErtsPSD *psd = erts_alloc(ERTS_ALC_T_PSD, sizeof(ErtsPSD));
int i;
for (i = 0; i < ERTS_PSD_SIZE; i++)
psd->data[i] = NULL;
ERTS_SMP_LC_ASSERT(plocks);
ERTS_SMP_LC_ASSERT(plocks == erts_proc_lc_my_proc_locks(p));
xplocks = ERTS_PROC_LOCKS_ALL;
xplocks &= ~plocks;
if (xplocks && erts_smp_proc_trylock(p, xplocks) == EBUSY) {
if (xplocks & ERTS_PROC_LOCK_MAIN) {
erts_smp_proc_inc_refc(p);
erts_smp_proc_unlock(p, plocks);
erts_smp_proc_lock(p, ERTS_PROC_LOCKS_ALL);
refc = 1;
}
else {
if (plocks & ERTS_PROC_LOCKS_ALL_MINOR)
erts_smp_proc_unlock(p, plocks & ERTS_PROC_LOCKS_ALL_MINOR);
erts_smp_proc_lock(p, ERTS_PROC_LOCKS_ALL_MINOR);
}
}
if (!p->psd)
p->psd = psd;
if (xplocks)
erts_smp_proc_unlock(p, xplocks);
if (refc)
erts_smp_proc_dec_refc(p);
ASSERT(p->psd);
if (p->psd != psd)
erts_free(ERTS_ALC_T_PSD, psd);
old = p->psd->data[ix];
p->psd->data[ix] = data;
ERTS_SMP_LC_ASSERT(plocks == erts_proc_lc_my_proc_locks(p));
return old;
}
#ifdef ERTS_SMP
void
erts_sched_finish_poke(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flags)
{
switch (flags & ERTS_SSI_FLGS_SLEEP_TYPE) {
case ERTS_SSI_FLG_POLL_SLEEPING:
erts_sys_schedule_interrupt(1);
break;
case ERTS_SSI_FLG_POLL_SLEEPING|ERTS_SSI_FLG_TSE_SLEEPING:
/*
* Thread progress blocking while poll sleeping; need
* to signal on both...
*/
erts_sys_schedule_interrupt(1);
/* fall through */
case ERTS_SSI_FLG_TSE_SLEEPING:
erts_tse_set(ssi->event);
break;
case 0:
break;
default:
erl_exit(ERTS_ABORT_EXIT, "%s:%d: Internal error\n",
__FILE__, __LINE__);
break;
}
}
#endif
static ERTS_INLINE void
set_aux_work_flags_wakeup_nob(ErtsSchedulerSleepInfo *ssi,
erts_aint32_t flgs)
{
erts_aint32_t old_flgs;
ERTS_DBG_CHK_SSI_AUX_WORK(ssi);
old_flgs = erts_atomic32_read_nob(&ssi->aux_work);
if ((old_flgs & flgs) == 0) {
old_flgs = erts_atomic32_read_bor_nob(&ssi->aux_work, flgs);
if ((old_flgs & flgs) == 0) {
#ifdef ERTS_SMP
erts_sched_poke(ssi);
#else
erts_sys_schedule_interrupt(1);
#endif
}
}
}
static ERTS_INLINE void
set_aux_work_flags_wakeup_relb(ErtsSchedulerSleepInfo *ssi,
erts_aint32_t flgs)
{
erts_aint32_t old_flgs;
ERTS_DBG_CHK_SSI_AUX_WORK(ssi);
old_flgs = erts_atomic32_read_bor_relb(&ssi->aux_work, flgs);
if ((old_flgs & flgs) == 0) {
#ifdef ERTS_SMP
erts_sched_poke(ssi);
#else
erts_sys_schedule_interrupt(1);
#endif
}
}
static ERTS_INLINE erts_aint32_t
set_aux_work_flags(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flgs)
{
return erts_atomic32_read_bor_nob(&ssi->aux_work, flgs);
}
static ERTS_INLINE erts_aint32_t
unset_aux_work_flags(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flgs)
{
return erts_atomic32_read_band_nob(&ssi->aux_work, ~flgs);
}
#ifdef ERTS_SMP
static ERTS_INLINE void
haw_thr_prgr_current_reset(ErtsAuxWorkData *awdp)
{
awdp->current_thr_prgr = ERTS_THR_PRGR_INVALID;
}
static ERTS_INLINE ErtsThrPrgrVal
haw_thr_prgr_current(ErtsAuxWorkData *awdp)
{
ErtsThrPrgrVal current = awdp->current_thr_prgr;
if (current == ERTS_THR_PRGR_INVALID) {
current = erts_thr_progress_current();
awdp->current_thr_prgr = current;
}
return current;
}
static ERTS_INLINE void
haw_thr_prgr_current_check_progress(ErtsAuxWorkData *awdp)
{
ErtsThrPrgrVal current = awdp->current_thr_prgr;
if (current != ERTS_THR_PRGR_INVALID
&& !erts_thr_progress_equal(current, erts_thr_progress_current())) {
/*
* We have used a previouly read current value that isn't the
* latest; need to poke ourselfs in order to guarantee no loss
* of wakeups.
*/
erts_sched_poke(awdp->ssi);
}
}
static ERTS_INLINE erts_aint32_t
handle_delayed_aux_work_wakeup(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
int jix, max_jix;
ASSERT(awdp->delayed_wakeup.next != ERTS_DELAYED_WAKEUP_INFINITY);
if (!waiting && awdp->delayed_wakeup.next > awdp->esdp->reductions)
return aux_work;
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP);
ERTS_THR_MEMORY_BARRIER;
max_jix = awdp->delayed_wakeup.jix;
awdp->delayed_wakeup.jix = -1;
for (jix = 0; jix <= max_jix; jix++) {
int sched = awdp->delayed_wakeup.job[jix].sched;
erts_aint32_t aux_work = awdp->delayed_wakeup.job[jix].aux_work;
ASSERT(awdp->delayed_wakeup.sched2jix[sched] == jix);
awdp->delayed_wakeup.sched2jix[sched] = -1;
set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(sched-1),
aux_work);
}
awdp->delayed_wakeup.next = ERTS_DELAYED_WAKEUP_INFINITY;
return aux_work & ~ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP;
}
static ERTS_INLINE void
schedule_aux_work_wakeup(ErtsAuxWorkData *awdp,
int sched,
erts_aint32_t aux_work)
{
int jix = awdp->delayed_wakeup.sched2jix[sched];
if (jix >= 0) {
ASSERT(awdp->delayed_wakeup.job[jix].sched == sched);
awdp->delayed_wakeup.job[jix].aux_work |= aux_work;
}
else {
jix = ++awdp->delayed_wakeup.jix;
awdp->delayed_wakeup.sched2jix[sched] = jix;
awdp->delayed_wakeup.job[jix].sched = sched;
awdp->delayed_wakeup.job[jix].aux_work = aux_work;
}
if (awdp->delayed_wakeup.next != ERTS_DELAYED_WAKEUP_INFINITY) {
ASSERT(erts_atomic32_read_nob(&awdp->ssi->aux_work)
& ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP);
}
else {
awdp->delayed_wakeup.next = (awdp->esdp->reductions
+ ERTS_DELAYED_WAKEUP_REDUCTIONS);
ASSERT(!(erts_atomic32_read_nob(&awdp->ssi->aux_work)
& ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP));
set_aux_work_flags_wakeup_nob(awdp->ssi,
ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP);
}
}
#endif
typedef struct erts_misc_aux_work_t_ erts_misc_aux_work_t;
struct erts_misc_aux_work_t_ {
void (*func)(void *);
void *arg;
};
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_aux_work,
erts_misc_aux_work_t,
200,
ERTS_ALC_T_MISC_AUX_WORK)
typedef union {
ErtsThrQ_t q;
char align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))];
} erts_algnd_misc_aux_work_q_t;
static erts_algnd_misc_aux_work_q_t *misc_aux_work_queues;
static void
notify_aux_work(void *vssi)
{
set_aux_work_flags_wakeup_nob((ErtsSchedulerSleepInfo *) vssi,
ERTS_SSI_AUX_WORK_MISC);
}
static void
init_misc_aux_work(void)
{
int ix;
ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT;
qinit.notify = notify_aux_work;
init_misc_aux_work_alloc();
misc_aux_work_queues =
erts_alloc_permanent_cache_aligned(ERTS_ALC_T_MISC_AUX_WORK_Q,
sizeof(erts_algnd_misc_aux_work_q_t)
* (erts_no_schedulers+1));
#ifdef ERTS_SMP
ix = 0; /* aux_thread + schedulers */
#else
ix = 1; /* scheduler only */
#endif
for (; ix <= erts_no_schedulers; ix++) {
qinit.arg = (void *) ERTS_SCHED_SLEEP_INFO_IX(ix-1);
erts_thr_q_initialize(&misc_aux_work_queues[ix].q, &qinit);
}
}
static erts_aint32_t
misc_aux_work_clean(ErtsThrQ_t *q,
ErtsAuxWorkData *awdp,
erts_aint32_t aux_work)
{
switch (erts_thr_q_clean(q)) {
case ERTS_THR_Q_DIRTY:
set_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC);
return aux_work | ERTS_SSI_AUX_WORK_MISC;
case ERTS_THR_Q_NEED_THR_PRGR:
#ifdef ERTS_SMP
set_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC_THR_PRGR);
erts_thr_progress_wakeup(awdp->esdp,
erts_thr_q_need_thr_progress(q));
#endif
case ERTS_THR_Q_CLEAN:
break;
}
return aux_work;
}
static ERTS_INLINE erts_aint32_t
handle_misc_aux_work(ErtsAuxWorkData *awdp,
erts_aint32_t aux_work,
int waiting)
{
ErtsThrQ_t *q = &misc_aux_work_queues[awdp->sched_id].q;
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC);
while (1) {
erts_misc_aux_work_t *mawp = erts_thr_q_dequeue(q);
if (!mawp)
break;
mawp->func(mawp->arg);
misc_aux_work_free(mawp);
}
return misc_aux_work_clean(q, awdp, aux_work & ~ERTS_SSI_AUX_WORK_MISC);
}
#ifdef ERTS_SMP
static ERTS_INLINE erts_aint32_t
handle_misc_aux_work_thr_prgr(ErtsAuxWorkData *awdp,
erts_aint32_t aux_work,
int waiting)
{
if (!erts_thr_progress_has_reached_this(haw_thr_prgr_current(awdp),
awdp->misc.thr_prgr))
return aux_work & ~ERTS_SSI_AUX_WORK_MISC_THR_PRGR;
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC_THR_PRGR);
return misc_aux_work_clean(&misc_aux_work_queues[awdp->sched_id].q,
awdp,
aux_work & ~ERTS_SSI_AUX_WORK_MISC_THR_PRGR);
}
#endif
static ERTS_INLINE void
schedule_misc_aux_work(int sched_id,
void (*func)(void *),
void *arg)
{
ErtsThrQ_t *q;
erts_misc_aux_work_t *mawp;
#ifdef ERTS_SMP
ASSERT(0 <= sched_id && sched_id <= erts_no_schedulers);
#else
ASSERT(sched_id == 1);
#endif
q = &misc_aux_work_queues[sched_id].q;
mawp = misc_aux_work_alloc();
mawp->func = func;
mawp->arg = arg;
erts_thr_q_enqueue(q, mawp);
}
void
erts_schedule_misc_aux_work(int sched_id,
void (*func)(void *),
void *arg)
{
schedule_misc_aux_work(sched_id, func, arg);
}
void
erts_schedule_multi_misc_aux_work(int ignore_self,
int max_sched,
void (*func)(void *),
void *arg)
{
int id, self = 0;
if (ignore_self) {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
if (esdp)
self = (int) esdp->no;
}
ASSERT(0 < max_sched && max_sched <= erts_no_schedulers);
for (id = 1; id <= max_sched; id++) {
if (id == self)
continue;
schedule_misc_aux_work(id, func, arg);
}
}
#if ERTS_USE_ASYNC_READY_Q
void
erts_notify_check_async_ready_queue(void *vno)
{
int ix = ((int) (SWord) vno) -1;
set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(ix),
ERTS_SSI_AUX_WORK_ASYNC_READY);
}
static ERTS_INLINE erts_aint32_t
handle_async_ready(ErtsAuxWorkData *awdp,
erts_aint32_t aux_work,
int waiting)
{
ErtsSchedulerSleepInfo *ssi = awdp->ssi;
unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY);
if (erts_check_async_ready(awdp->async_ready.queue)) {
if (set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY)
& ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN) {
unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN);
aux_work &= ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
}
return aux_work;
}
#ifdef ERTS_SMP
awdp->async_ready.need_thr_prgr = 0;
#endif
set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN);
return ((aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY)
| ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN);
}
static ERTS_INLINE erts_aint32_t
handle_async_ready_clean(ErtsAuxWorkData *awdp,
erts_aint32_t aux_work,
int waiting)
{
void *thr_prgr_p;
#ifdef ERTS_SMP
if (awdp->async_ready.need_thr_prgr
&& !erts_thr_progress_has_reached_this(haw_thr_prgr_current(awdp),
awdp->async_ready.thr_prgr)) {
return aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
}
awdp->async_ready.need_thr_prgr = 0;
thr_prgr_p = (void *) &awdp->async_ready.thr_prgr;
#else
thr_prgr_p = NULL;
#endif
switch (erts_async_ready_clean(awdp->async_ready.queue, thr_prgr_p)) {
case ERTS_ASYNC_READY_CLEAN:
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN);
return aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
#ifdef ERTS_SMP
case ERTS_ASYNC_READY_NEED_THR_PRGR:
erts_thr_progress_wakeup(awdp->esdp,
awdp->async_ready.thr_prgr);
awdp->async_ready.need_thr_prgr = 1;
return aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
#endif
default:
return aux_work;
}
}
#endif
static ERTS_INLINE erts_aint32_t
handle_fix_alloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
ErtsSchedulerSleepInfo *ssi = awdp->ssi;
erts_aint32_t res;
unset_aux_work_flags(ssi, (ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM
| ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC));
aux_work &= ~(ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM
| ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC);
res = erts_alloc_fix_alloc_shrink(awdp->sched_id, aux_work);
if (res) {
set_aux_work_flags(ssi, res);
aux_work |= res;
}
return aux_work;
}
#ifdef ERTS_SMP
void
erts_alloc_notify_delayed_dealloc(int ix)
{
ErtsSchedulerData *esdp = erts_get_scheduler_data();
if (esdp)
schedule_aux_work_wakeup(&esdp->aux_work_data,
ix,
ERTS_SSI_AUX_WORK_DD);
else
set_aux_work_flags_wakeup_relb(ERTS_SCHED_SLEEP_INFO_IX(ix-1),
ERTS_SSI_AUX_WORK_DD);
}
static ERTS_INLINE erts_aint32_t
handle_delayed_dealloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
ErtsSchedulerSleepInfo *ssi = awdp->ssi;
int need_thr_progress = 0;
ErtsThrPrgrVal wakeup = ERTS_THR_PRGR_INVALID;
int more_work = 0;
unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_DD);
erts_alloc_scheduler_handle_delayed_dealloc((void *) awdp->esdp,
&need_thr_progress,
&wakeup,
&more_work);
if (more_work) {
if (set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_DD)
& ERTS_SSI_AUX_WORK_DD_THR_PRGR) {
unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_DD_THR_PRGR);
aux_work &= ~ERTS_SSI_AUX_WORK_DD_THR_PRGR;
}
return aux_work;
}
if (need_thr_progress) {
if (wakeup == ERTS_THR_PRGR_INVALID)
wakeup = erts_thr_progress_later(awdp->esdp);
awdp->dd.thr_prgr = wakeup;
set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_DD_THR_PRGR);
awdp->dd.thr_prgr = wakeup;
erts_thr_progress_wakeup(awdp->esdp, wakeup);
}
else if (awdp->dd.completed_callback) {
awdp->dd.completed_callback(awdp->dd.completed_arg);
awdp->dd.completed_callback = NULL;
awdp->dd.completed_arg = NULL;
}
return aux_work & ~ERTS_SSI_AUX_WORK_DD;
}
static ERTS_INLINE erts_aint32_t
handle_delayed_dealloc_thr_prgr(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
ErtsSchedulerSleepInfo *ssi;
int need_thr_progress;
int more_work;
ErtsThrPrgrVal wakeup = ERTS_THR_PRGR_INVALID;
ErtsThrPrgrVal current = haw_thr_prgr_current(awdp);
if (!erts_thr_progress_has_reached_this(current, awdp->dd.thr_prgr))
return aux_work & ~ERTS_SSI_AUX_WORK_DD_THR_PRGR;
ssi = awdp->ssi;
need_thr_progress = 0;
more_work = 0;
erts_alloc_scheduler_handle_delayed_dealloc((void *) awdp->esdp,
&need_thr_progress,
&wakeup,
&more_work);
if (more_work) {
set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_DD);
unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_DD_THR_PRGR);
return ((aux_work & ~ERTS_SSI_AUX_WORK_DD_THR_PRGR)
| ERTS_SSI_AUX_WORK_DD);
}
if (need_thr_progress) {
if (wakeup == ERTS_THR_PRGR_INVALID)
wakeup = erts_thr_progress_later(awdp->esdp);
awdp->dd.thr_prgr = wakeup;
erts_thr_progress_wakeup(awdp->esdp, wakeup);
}
else {
unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_DD_THR_PRGR);
if (awdp->dd.completed_callback) {
awdp->dd.completed_callback(awdp->dd.completed_arg);
awdp->dd.completed_callback = NULL;
awdp->dd.completed_arg = NULL;
}
}
return aux_work & ~ERTS_SSI_AUX_WORK_DD_THR_PRGR;
}
static erts_atomic32_t completed_dealloc_count;
static void
completed_dealloc(void *vproc)
{
if (erts_atomic32_dec_read_mb(&completed_dealloc_count) == 0) {
erts_resume((Process *) vproc, (ErtsProcLocks) 0);
erts_smp_proc_dec_refc((Process *) vproc);
}
}
static void
setup_completed_dealloc(void *vproc)
{
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ErtsAuxWorkData *awdp = (esdp
? &esdp->aux_work_data
: aux_thread_aux_work_data);
erts_alloc_fix_alloc_shrink(awdp->sched_id, 0);
set_aux_work_flags_wakeup_nob(awdp->ssi, ERTS_SSI_AUX_WORK_DD);
awdp->dd.completed_callback = completed_dealloc;
awdp->dd.completed_arg = vproc;
}
static void
prep_setup_completed_dealloc(void *vproc)
{
erts_aint32_t count = (erts_aint32_t) (erts_no_schedulers+1);
if (erts_atomic32_dec_read_mb(&completed_dealloc_count) == count) {
/* scheduler threads */
erts_schedule_multi_misc_aux_work(0,
erts_no_schedulers,
setup_completed_dealloc,
vproc);
/* aux_thread */
erts_schedule_misc_aux_work(0,
setup_completed_dealloc,
vproc);
}
}
#endif /* ERTS_SMP */
int
erts_debug_wait_deallocations(Process *c_p)
{
#ifndef ERTS_SMP
erts_alloc_fix_alloc_shrink(1, 0);
return 1;
#else
/* Only one process at a time can do this */
erts_aint32_t count = (erts_aint32_t) (2*(erts_no_schedulers+1));
if (0 == erts_atomic32_cmpxchg_mb(&completed_dealloc_count,
count,
0)) {
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
erts_smp_proc_inc_refc(c_p);
/* scheduler threads */
erts_schedule_multi_misc_aux_work(0,
erts_no_schedulers,
prep_setup_completed_dealloc,
(void *) c_p);
/* aux_thread */
erts_schedule_misc_aux_work(0,
prep_setup_completed_dealloc,
(void *) c_p);
return 1;
}
return 0;
#endif
}
#ifdef ERTS_SMP_SCHEDULERS_NEED_TO_CHECK_CHILDREN
void
erts_smp_notify_check_children_needed(void)
{
int i;
for (i = 0; i < erts_no_schedulers; i++)
set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(i),
ERTS_SSI_AUX_WORK_CHECK_CHILDREN);
}
static ERTS_INLINE erts_aint32_t
handle_check_children(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_CHECK_CHILDREN);
erts_check_children();
return aux_work & ~ERTS_SSI_AUX_WORK_CHECK_CHILDREN;
}
#endif
static void
notify_reap_ports_relb(void)
{
int i;
for (i = 0; i < erts_no_schedulers; i++) {
set_aux_work_flags_wakeup_relb(ERTS_SCHED_SLEEP_INFO_IX(i),
ERTS_SSI_AUX_WORK_REAP_PORTS);
}
}
erts_smp_atomic32_t erts_halt_progress;
int erts_halt_code;
static ERTS_INLINE erts_aint32_t
handle_reap_ports(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_REAP_PORTS);
awdp->esdp->run_queue->halt_in_progress = 1;
if (erts_smp_atomic32_dec_read_acqb(&erts_halt_progress) == 0) {
int i;
erts_smp_atomic32_set_nob(&erts_halt_progress, 1);
for (i = 0; i < erts_max_ports; i++) {
Port *prt = &erts_port[i];
erts_smp_port_state_lock(prt);
if ((prt->status & (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP
| ERTS_PORT_SFLG_HALT))) {
erts_smp_port_state_unlock(prt);
continue;
}
/* We need to set the halt flag - get the port lock */
#ifdef ERTS_SMP
erts_smp_atomic_inc_nob(&prt->refc);
#endif
erts_smp_port_state_unlock(prt);
#ifdef ERTS_SMP
erts_smp_mtx_lock(prt->lock);
#endif
if ((prt->status & (ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP
| ERTS_PORT_SFLG_HALT))) {
erts_port_release(prt);
continue;
}
erts_port_status_bor_set(prt, ERTS_PORT_SFLG_HALT);
erts_smp_atomic32_inc_nob(&erts_halt_progress);
if (prt->status & (ERTS_PORT_SFLG_EXITING
| ERTS_PORT_SFLG_CLOSING)) {
erts_port_release(prt);
continue;
}
erts_do_exit_port(prt, prt->id, am_killed);
erts_port_release(prt);
}
if (erts_smp_atomic32_dec_read_nob(&erts_halt_progress) == 0) {
erl_exit_flush_async(erts_halt_code, "");
}
}
return aux_work & ~ERTS_SSI_AUX_WORK_REAP_PORTS;
}
#if HAVE_ERTS_MSEG
static ERTS_INLINE erts_aint32_t
handle_mseg_cache_check(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK);
erts_mseg_cache_check();
return aux_work & ~ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK;
}
#endif
static ERTS_INLINE erts_aint32_t
handle_setup_aux_work_timer(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_SET_TMO);
setup_aux_work_timer();
return aux_work & ~ERTS_SSI_AUX_WORK_SET_TMO;
}
static erts_aint32_t
handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t orig_aux_work, int waiting)
{
#undef HANDLE_AUX_WORK
#define HANDLE_AUX_WORK(FLG, HNDLR) \
ignore |= FLG; \
if (aux_work & FLG) { \
aux_work = HNDLR(awdp, aux_work, waiting); \
ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); \
if (!(aux_work & ~ignore)) { \
ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); \
return aux_work; \
} \
}
erts_aint32_t aux_work = orig_aux_work;
erts_aint32_t ignore = 0;
#ifdef ERTS_SMP
haw_thr_prgr_current_reset(awdp);
#endif
ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
ASSERT(aux_work);
/*
* Handlers are *only* allowed to modify flags in return value
* and ssi flags that are explicity handled by the handler.
* Handlers are, e.g., not allowed to read the ssi flag field and
* then unconditionally return that value.
*
* Flag field returned should only contain flags for work that
* can continue immediately.
*/
/*
* Keep ERTS_SSI_AUX_WORK flags in expected frequency order relative
* eachother. Most frequent first.
*/
#ifdef ERTS_SMP
HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP,
handle_delayed_aux_work_wakeup);
HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_DD,
handle_delayed_dealloc);
/* DD must be before DD_THR_PRGR */
HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_DD_THR_PRGR,
handle_delayed_dealloc_thr_prgr);
#endif
HANDLE_AUX_WORK((ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM
| ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC),
handle_fix_alloc);
#if ERTS_USE_ASYNC_READY_Q
HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_ASYNC_READY,
handle_async_ready);
/* ASYNC_READY must be before ASYNC_READY_CLEAN */
HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN,
handle_async_ready_clean);
#endif
#ifdef ERTS_SMP
HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_MISC_THR_PRGR,
handle_misc_aux_work_thr_prgr);
#endif
/* MISC_THR_PRGR must be before MISC */
HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_MISC,
handle_misc_aux_work);
#ifdef ERTS_SMP_SCHEDULERS_NEED_TO_CHECK_CHILDREN
HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_CHECK_CHILDREN,
handle_check_children);
#endif
HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_SET_TMO,
handle_setup_aux_work_timer);
#if HAVE_ERTS_MSEG
HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK,
handle_mseg_cache_check);
#endif
HANDLE_AUX_WORK(ERTS_SSI_AUX_WORK_REAP_PORTS,
handle_reap_ports);
ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
#ifdef ERTS_SMP
if (waiting && !aux_work)
haw_thr_prgr_current_check_progress(awdp);
#endif
return aux_work;
#undef HANDLE_AUX_WORK
}
typedef struct {
union {
ErlTimer data;
char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErlTimer))];
} timer;
int initialized;
erts_atomic32_t refc;
erts_atomic32_t type[1];
} ErtsAuxWorkTmo;
static ErtsAuxWorkTmo *aux_work_tmo;
static void
aux_work_timeout_early_init(int no_schedulers)
{
int i;
UWord p;
/*
* This is done really early. Our own allocators have
* not been started yet.
*/
p = (UWord) malloc((sizeof(ErtsAuxWorkTmo)
+ sizeof(erts_atomic32_t)*(no_schedulers+1))
+ ERTS_CACHE_LINE_SIZE-1);
if (p & ERTS_CACHE_LINE_MASK)
p = (p & ~ERTS_CACHE_LINE_MASK) + ERTS_CACHE_LINE_SIZE;
ASSERT((p & ERTS_CACHE_LINE_MASK) == 0);
aux_work_tmo = (ErtsAuxWorkTmo *) p;
aux_work_tmo->initialized = 0;
erts_atomic32_init_nob(&aux_work_tmo->refc, 0);
for (i = 0; i <= no_schedulers; i++)
erts_atomic32_init_nob(&aux_work_tmo->type[i], 0);
}
void
aux_work_timeout_late_init(void)
{
aux_work_tmo->initialized = 1;
if (erts_atomic32_read_nob(&aux_work_tmo->refc)) {
aux_work_tmo->timer.data.active = 0;
erts_set_timer(&aux_work_tmo->timer.data,
aux_work_timeout,
NULL,
NULL,
1000);
}
}
static void
aux_work_timeout(void *unused)
{
erts_aint32_t refc;
int i;
#ifdef ERTS_SMP
i = 0;
#else
i = 1;
#endif
for (; i <= erts_no_schedulers; i++) {
erts_aint32_t type;
type = erts_atomic32_read_acqb(&aux_work_tmo->type[i]);
if (type)
set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(i-1),
type);
}
refc = erts_atomic32_read_nob(&aux_work_tmo->refc);
ASSERT(refc >= 1);
if (refc != 1
|| 1 != erts_atomic32_cmpxchg_relb(&aux_work_tmo->refc, 0, 1)) {
/* Setup next timeout... */
aux_work_tmo->timer.data.active = 0;
erts_set_timer(&aux_work_tmo->timer.data,
aux_work_timeout,
NULL,
NULL,
1000);
}
}
static void
setup_aux_work_timer(void)
{
#ifndef ERTS_SMP
if (!erts_get_scheduler_data())
set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(0),
ERTS_SSI_AUX_WORK_SET_TMO);
else
#endif
{
aux_work_tmo->timer.data.active = 0;
erts_set_timer(&aux_work_tmo->timer.data,
aux_work_timeout,
NULL,
NULL,
1000);
}
}
erts_aint32_t
erts_set_aux_work_timeout(int ix, erts_aint32_t type, int enable)
{
erts_aint32_t old, refc;
#ifndef ERTS_SMP
ix = 1;
#endif
ERTS_DBG_CHK_AUX_WORK_VAL(type);
ERTS_DBG_CHK_AUX_WORK_VAL(erts_atomic32_read_nob(&aux_work_tmo->type[ix]));
// erts_fprintf(stderr, "t(%d, 0x%x, %d)\n", ix, type, enable);
if (!enable) {
old = erts_atomic32_read_band_mb(&aux_work_tmo->type[ix], ~type);
ERTS_DBG_CHK_AUX_WORK_VAL(erts_atomic32_read_nob(&aux_work_tmo->type[ix]));
if (old != 0 && (old & ~type) == 0)
erts_atomic32_dec_relb(&aux_work_tmo->refc);
return old;
}
old = erts_atomic32_read_bor_mb(&aux_work_tmo->type[ix], type);
ERTS_DBG_CHK_AUX_WORK_VAL(erts_atomic32_read_nob(&aux_work_tmo->type[ix]));
if (old == 0 && type != 0) {
refc = erts_atomic32_inc_read_acqb(&aux_work_tmo->refc);
if (refc == 1) {
erts_atomic32_inc_acqb(&aux_work_tmo->refc);
if (aux_work_tmo->initialized)
setup_aux_work_timer();
}
}
return old;
}
static ERTS_INLINE void
sched_waiting_sys(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
ASSERT(rq->waiting >= 0);
rq->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK
| ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK);
rq->waiting++;
rq->waiting *= -1;
rq->woken = 0;
if (erts_system_profile_flags.scheduler)
profile_scheduler(make_small(no), am_inactive);
}
static ERTS_INLINE void
sched_active_sys(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
ASSERT(rq->waiting < 0);
rq->waiting *= -1;
rq->waiting--;
if (erts_system_profile_flags.scheduler)
profile_scheduler(make_small(no), am_active);
}
Uint
erts_active_schedulers(void)
{
Uint as = erts_no_schedulers;
ERTS_ATOMIC_FOREACH_RUNQ(rq, as -= abs(rq->waiting));
ASSERT(as >= 0);
return as;
}
#ifdef ERTS_SMP
static ERTS_INLINE void
clear_sys_scheduling(void)
{
erts_smp_atomic32_set_mb(&doing_sys_schedule, 0);
}
static ERTS_INLINE int
try_set_sys_scheduling(void)
{
return 0 == erts_smp_atomic32_cmpxchg_acqb(&doing_sys_schedule, 1, 0);
}
#endif
static ERTS_INLINE int
prepare_for_sys_schedule(void)
{
#ifdef ERTS_SMP
while (!erts_port_task_have_outstanding_io_tasks()
&& try_set_sys_scheduling()) {
if (!erts_port_task_have_outstanding_io_tasks())
return 1;
clear_sys_scheduling();
}
return 0;
#else
return !erts_port_task_have_outstanding_io_tasks();
#endif
}
#ifdef ERTS_SMP
static ERTS_INLINE void
sched_change_waiting_sys_to_waiting(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
ASSERT(rq->waiting < 0);
rq->waiting *= -1;
}
static ERTS_INLINE void
sched_waiting(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
rq->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK
| ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK);
if (rq->waiting < 0)
rq->waiting--;
else
rq->waiting++;
rq->woken = 0;
if (erts_system_profile_flags.scheduler)
profile_scheduler(make_small(no), am_inactive);
}
static ERTS_INLINE void
sched_active(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
if (rq->waiting < 0)
rq->waiting++;
else
rq->waiting--;
if (erts_system_profile_flags.scheduler)
profile_scheduler(make_small(no), am_active);
}
static int ERTS_INLINE
ongoing_multi_scheduling_block(void)
{
ERTS_SMP_LC_ASSERT(erts_lc_mtx_is_locked(&schdlr_sspnd.mtx));
return schdlr_sspnd.msb.ongoing;
}
static ERTS_INLINE void
empty_runq(ErtsRunQueue *rq)
{
erts_aint32_t oifls = erts_smp_atomic32_read_band_nob(&rq->info_flags,
~ERTS_RUNQ_IFLG_NONEMPTY);
if (oifls & ERTS_RUNQ_IFLG_NONEMPTY) {
#ifdef DEBUG
erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues);
/*
* For a short period of time no_empty_run_queues may have
* been increased twice for a specific run queue.
*/
ASSERT(0 <= empty && empty < 2*erts_no_run_queues);
#endif
erts_smp_atomic32_inc_relb(&no_empty_run_queues);
}
}
static ERTS_INLINE void
non_empty_runq(ErtsRunQueue *rq)
{
erts_aint32_t oifls = erts_smp_atomic32_read_bor_nob(&rq->info_flags,
ERTS_RUNQ_IFLG_NONEMPTY);
if (!(oifls & ERTS_RUNQ_IFLG_NONEMPTY)) {
#ifdef DEBUG
erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues);
/*
* For a short period of time no_empty_run_queues may have
* been increased twice for a specific run queue.
*/
ASSERT(0 < empty && empty <= 2*erts_no_run_queues);
#endif
erts_smp_atomic32_dec_relb(&no_empty_run_queues);
}
}
static erts_aint32_t
sched_prep_spin_wait(ErtsSchedulerSleepInfo *ssi)
{
erts_aint32_t oflgs;
erts_aint32_t nflgs = (ERTS_SSI_FLG_SLEEPING
| ERTS_SSI_FLG_WAITING);
erts_aint32_t xflgs = 0;
do {
oflgs = erts_smp_atomic32_cmpxchg_acqb(&ssi->flags, nflgs, xflgs);
if (oflgs == xflgs)
return nflgs;
xflgs = oflgs;
} while (!(oflgs & ERTS_SSI_FLG_SUSPENDED));
return oflgs;
}
static erts_aint32_t
sched_prep_cont_spin_wait(ErtsSchedulerSleepInfo *ssi)
{
erts_aint32_t oflgs;
erts_aint32_t nflgs = (ERTS_SSI_FLG_SLEEPING
| ERTS_SSI_FLG_WAITING);
erts_aint32_t xflgs = ERTS_SSI_FLG_WAITING;
do {
oflgs = erts_smp_atomic32_cmpxchg_acqb(&ssi->flags, nflgs, xflgs);
if (oflgs == xflgs)
return nflgs;
xflgs = oflgs;
nflgs |= oflgs & ERTS_SSI_FLG_SUSPENDED;
} while (oflgs & ERTS_SSI_FLG_WAITING);
return oflgs;
}
static erts_aint32_t
sched_spin_wait(ErtsSchedulerSleepInfo *ssi, int spincount)
{
int until_yield = ERTS_SCHED_SPIN_UNTIL_YIELD;
int sc = spincount;
erts_aint32_t flgs;
do {
flgs = erts_smp_atomic32_read_acqb(&ssi->flags);
if ((flgs & (ERTS_SSI_FLG_SLEEPING|ERTS_SSI_FLG_WAITING))
!= (ERTS_SSI_FLG_SLEEPING|ERTS_SSI_FLG_WAITING)) {
break;
}
ERTS_SPIN_BODY;
if (--until_yield == 0) {
until_yield = ERTS_SCHED_SPIN_UNTIL_YIELD;
erts_thr_yield();
}
} while (--sc > 0);
return flgs;
}
static erts_aint32_t
sched_set_sleeptype(ErtsSchedulerSleepInfo *ssi, erts_aint32_t sleep_type)
{
erts_aint32_t oflgs;
erts_aint32_t nflgs = ERTS_SSI_FLG_SLEEPING|ERTS_SSI_FLG_WAITING|sleep_type;
erts_aint32_t xflgs = ERTS_SSI_FLG_SLEEPING|ERTS_SSI_FLG_WAITING;
if (sleep_type == ERTS_SSI_FLG_TSE_SLEEPING)
erts_tse_reset(ssi->event);
else {
ASSERT(sleep_type == ERTS_SSI_FLG_POLL_SLEEPING);
erts_sys_schedule_interrupt(0);
}
while (1) {
oflgs = erts_smp_atomic32_cmpxchg_acqb(&ssi->flags, nflgs, xflgs);
if (oflgs == xflgs)
return nflgs;
if ((oflgs & (ERTS_SSI_FLG_SLEEPING|ERTS_SSI_FLG_WAITING))
!= (ERTS_SSI_FLG_SLEEPING|ERTS_SSI_FLG_WAITING)) {
return oflgs;
}
xflgs = oflgs;
nflgs |= oflgs & ERTS_SSI_FLG_SUSPENDED;
}
}
#define ERTS_SCHED_WAIT_WOKEN(FLGS) \
(((FLGS) & (ERTS_SSI_FLG_WAITING|ERTS_SSI_FLG_SUSPENDED)) \
!= ERTS_SSI_FLG_WAITING)
static void
thr_prgr_wakeup(void *vssi)
{
erts_sched_poke((ErtsSchedulerSleepInfo *) vssi);
}
static void
thr_prgr_prep_wait(void *vssi)
{
ErtsSchedulerSleepInfo *ssi = (ErtsSchedulerSleepInfo *) vssi;
erts_smp_atomic32_read_bor_acqb(&ssi->flags,
ERTS_SSI_FLG_SLEEPING);
}
static void
thr_prgr_wait(void *vssi)
{
ErtsSchedulerSleepInfo *ssi = (ErtsSchedulerSleepInfo *) vssi;
erts_aint32_t xflgs = ERTS_SSI_FLG_SLEEPING;
erts_tse_reset(ssi->event);
while (1) {
erts_aint32_t aflgs, nflgs;
nflgs = xflgs | ERTS_SSI_FLG_TSE_SLEEPING;
aflgs = erts_smp_atomic32_cmpxchg_acqb(&ssi->flags, nflgs, xflgs);
if (aflgs == xflgs) {
erts_tse_wait(ssi->event);
break;
}
if ((aflgs & ERTS_SSI_FLG_SLEEPING) == 0)
break;
xflgs = aflgs;
}
}
static void
thr_prgr_fin_wait(void *vssi)
{
ErtsSchedulerSleepInfo *ssi = (ErtsSchedulerSleepInfo *) vssi;
erts_smp_atomic32_read_band_nob(&ssi->flags,
~(ERTS_SSI_FLG_SLEEPING
| ERTS_SSI_FLG_TSE_SLEEPING));
}
static void init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp, char *dawwp);
static void *
aux_thread(void *unused)
{
ErtsAuxWorkData *awdp = aux_thread_aux_work_data;
ErtsSchedulerSleepInfo *ssi = ERTS_SCHED_SLEEP_INFO_IX(-1);
erts_aint32_t aux_work;
ErtsThrPrgrCallbacks callbacks;
int thr_prgr_active = 1;
ssi->event = erts_tse_fetch();
callbacks.arg = (void *) ssi;
callbacks.wakeup = thr_prgr_wakeup;
callbacks.prepare_wait = thr_prgr_prep_wait;
callbacks.wait = thr_prgr_wait;
callbacks.finalize_wait = thr_prgr_fin_wait;
erts_thr_progress_register_managed_thread(NULL, &callbacks, 1);
init_aux_work_data(awdp, NULL, NULL);
awdp->ssi = ssi;
sched_prep_spin_wait(ssi);
while (1) {
erts_aint32_t flgs;
aux_work = erts_atomic32_read_acqb(&ssi->aux_work);
if (aux_work) {
if (!thr_prgr_active)
erts_thr_progress_active(NULL, thr_prgr_active = 1);
aux_work = handle_aux_work(awdp, aux_work, 1);
if (aux_work && erts_thr_progress_update(NULL))
erts_thr_progress_leader_update(NULL);
}
if (!aux_work) {
if (thr_prgr_active)
erts_thr_progress_active(NULL, thr_prgr_active = 0);
erts_thr_progress_prepare_wait(NULL);
flgs = sched_spin_wait(ssi, 0);
if (flgs & ERTS_SSI_FLG_SLEEPING) {
ASSERT(flgs & ERTS_SSI_FLG_WAITING);
flgs = sched_set_sleeptype(ssi, ERTS_SSI_FLG_TSE_SLEEPING);
if (flgs & ERTS_SSI_FLG_SLEEPING) {
int res;
ASSERT(flgs & ERTS_SSI_FLG_TSE_SLEEPING);
ASSERT(flgs & ERTS_SSI_FLG_WAITING);
do {
res = erts_tse_wait(ssi->event);
} while (res == EINTR);
}
}
erts_thr_progress_finalize_wait(NULL);
}
flgs = sched_prep_spin_wait(ssi);
}
return NULL;
}
#endif /* ERTS_SMP */
static void
scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
{
int working = 1;
ErtsSchedulerSleepInfo *ssi = esdp->ssi;
int spincount;
erts_aint32_t aux_work = 0;
#ifdef ERTS_SMP
int thr_prgr_active = 1;
erts_aint32_t flgs;
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
flgs = sched_prep_spin_wait(ssi);
if (flgs & ERTS_SSI_FLG_SUSPENDED) {
/* Go suspend instead... */
return;
}
/*
* If all schedulers are waiting, one of them *should*
* be waiting in erl_sys_schedule()
*/
if (!prepare_for_sys_schedule()) {
sched_waiting(esdp->no, rq);
erts_smp_runq_unlock(rq);
spincount = sched_busy_wait.tse;
tse_wait:
if (thr_prgr_active != working)
sched_wall_time_change(esdp, thr_prgr_active);
while (1) {
aux_work = erts_atomic32_read_acqb(&ssi->aux_work);
if (aux_work) {
if (!thr_prgr_active) {
erts_thr_progress_active(esdp, thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
aux_work = handle_aux_work(&esdp->aux_work_data, aux_work, 1);
if (aux_work && erts_thr_progress_update(esdp))
erts_thr_progress_leader_update(esdp);
}
if (aux_work)
flgs = erts_smp_atomic32_read_acqb(&ssi->flags);
else {
if (thr_prgr_active) {
erts_thr_progress_active(esdp, thr_prgr_active = 0);
sched_wall_time_change(esdp, 0);
}
erts_thr_progress_prepare_wait(esdp);
flgs = sched_spin_wait(ssi, spincount);
if (flgs & ERTS_SSI_FLG_SLEEPING) {
ASSERT(flgs & ERTS_SSI_FLG_WAITING);
flgs = sched_set_sleeptype(ssi, ERTS_SSI_FLG_TSE_SLEEPING);
if (flgs & ERTS_SSI_FLG_SLEEPING) {
int res;
ASSERT(flgs & ERTS_SSI_FLG_TSE_SLEEPING);
ASSERT(flgs & ERTS_SSI_FLG_WAITING);
do {
res = erts_tse_wait(ssi->event);
} while (res == EINTR);
}
}
erts_thr_progress_finalize_wait(esdp);
}
if (!(flgs & ERTS_SSI_FLG_WAITING)) {
ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING));
break;
}
flgs = sched_prep_cont_spin_wait(ssi);
spincount = sched_busy_wait.aux_work;
if (!(flgs & ERTS_SSI_FLG_WAITING)) {
ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING));
break;
}
}
if (flgs & ~ERTS_SSI_FLG_SUSPENDED)
erts_smp_atomic32_read_band_nob(&ssi->flags, ERTS_SSI_FLG_SUSPENDED);
if (!thr_prgr_active) {
erts_thr_progress_active(esdp, thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
erts_smp_runq_lock(rq);
sched_active(esdp->no, rq);
}
else
#endif
{
erts_aint_t dt;
erts_smp_atomic32_set_relb(&function_calls, 0);
*fcalls = 0;
sched_waiting_sys(esdp->no, rq);
erts_smp_runq_unlock(rq);
ASSERT(working);
sched_wall_time_change(esdp, working = 0);
spincount = sched_busy_wait.sys_schedule;
if (spincount == 0)
goto sys_aux_work;
while (spincount-- > 0) {
sys_poll_aux_work:
if (working)
sched_wall_time_change(esdp, working = 0);
ASSERT(!erts_port_task_have_outstanding_io_tasks());
erl_sys_schedule(1); /* Might give us something to do */
dt = erts_do_time_read_and_reset();
if (dt) erts_bump_timer(dt);
sys_aux_work:
#ifndef ERTS_SMP
erts_sys_schedule_interrupt(0);
#endif
aux_work = erts_atomic32_read_acqb(&ssi->aux_work);
if (aux_work) {
if (!working)
sched_wall_time_change(esdp, working = 1);
#ifdef ERTS_SMP
if (!thr_prgr_active)
erts_thr_progress_active(esdp, thr_prgr_active = 1);
#endif
aux_work = handle_aux_work(&esdp->aux_work_data, aux_work, 1);
#ifdef ERTS_SMP
if (aux_work && erts_thr_progress_update(esdp))
erts_thr_progress_leader_update(esdp);
#endif
}
#ifndef ERTS_SMP
if (rq->len != 0 || rq->misc.start)
goto sys_woken;
#else
flgs = erts_smp_atomic32_read_acqb(&ssi->flags);
if (!(flgs & ERTS_SSI_FLG_WAITING)) {
ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING));
goto sys_woken;
}
/*
* If we got new I/O tasks we aren't allowed to
* call erl_sys_schedule() until it is handled.
*/
if (erts_port_task_have_outstanding_io_tasks()) {
clear_sys_scheduling();
/*
* Got to check that we still got I/O tasks; otherwise
* we have to continue checking for I/O...
*/
if (!prepare_for_sys_schedule()) {
spincount *= ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT;
goto tse_wait;
}
}
#endif
}
erts_smp_runq_lock(rq);
#ifdef ERTS_SMP
/*
* If we got new I/O tasks we aren't allowed to
* sleep in erl_sys_schedule().
*/
if (erts_port_task_have_outstanding_io_tasks()) {
clear_sys_scheduling();
/*
* Got to check that we still got I/O tasks; otherwise
* we have to wait in erl_sys_schedule() after all...
*/
if (!prepare_for_sys_schedule()) {
/*
* Not allowed to wait in erl_sys_schedule;
* do tse wait instead...
*/
sched_change_waiting_sys_to_waiting(esdp->no, rq);
erts_smp_runq_unlock(rq);
spincount = 0;
goto tse_wait;
}
}
#endif
if (aux_work) {
erts_smp_runq_unlock(rq);
goto sys_poll_aux_work;
}
#ifdef ERTS_SMP
flgs = sched_set_sleeptype(ssi, ERTS_SSI_FLG_POLL_SLEEPING);
if (!(flgs & ERTS_SSI_FLG_SLEEPING)) {
if (!(flgs & ERTS_SSI_FLG_WAITING)) {
ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING));
goto sys_locked_woken;
}
erts_smp_runq_unlock(rq);
flgs = sched_prep_cont_spin_wait(ssi);
if (!(flgs & ERTS_SSI_FLG_WAITING)) {
ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING));
goto sys_woken;
}
ASSERT(!erts_port_task_have_outstanding_io_tasks());
goto sys_poll_aux_work;
}
ASSERT(flgs & ERTS_SSI_FLG_POLL_SLEEPING);
ASSERT(flgs & ERTS_SSI_FLG_WAITING);
#endif
erts_smp_runq_unlock(rq);
if (working)
sched_wall_time_change(esdp, working = 0);
#ifdef ERTS_SMP
if (thr_prgr_active)
erts_thr_progress_active(esdp, thr_prgr_active = 0);
#endif
ASSERT(!erts_port_task_have_outstanding_io_tasks());
erl_sys_schedule(0);
dt = erts_do_time_read_and_reset();
if (dt) erts_bump_timer(dt);
#ifndef ERTS_SMP
if (rq->len == 0 && !rq->misc.start)
goto sys_aux_work;
sys_woken:
#else
flgs = sched_prep_cont_spin_wait(ssi);
if (flgs & ERTS_SSI_FLG_WAITING)
goto sys_aux_work;
sys_woken:
if (!thr_prgr_active)
erts_thr_progress_active(esdp, thr_prgr_active = 1);
erts_smp_runq_lock(rq);
sys_locked_woken:
if (!thr_prgr_active) {
erts_smp_runq_unlock(rq);
erts_thr_progress_active(esdp, thr_prgr_active = 1);
erts_smp_runq_lock(rq);
}
clear_sys_scheduling();
if (flgs & ~ERTS_SSI_FLG_SUSPENDED)
erts_smp_atomic32_read_band_nob(&ssi->flags, ERTS_SSI_FLG_SUSPENDED);
#endif
if (!working)
sched_wall_time_change(esdp, working = 1);
sched_active_sys(esdp->no, rq);
}
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
}
#ifdef ERTS_SMP
static ERTS_INLINE erts_aint32_t
ssi_flags_set_wake(ErtsSchedulerSleepInfo *ssi)
{
/* reset all flags but suspended */
erts_aint32_t oflgs;
erts_aint32_t nflgs = 0;
erts_aint32_t xflgs = ERTS_SSI_FLG_SLEEPING|ERTS_SSI_FLG_WAITING;
while (1) {
oflgs = erts_smp_atomic32_cmpxchg_relb(&ssi->flags, nflgs, xflgs);
if (oflgs == xflgs)
return oflgs;
nflgs = oflgs & ERTS_SSI_FLG_SUSPENDED;
xflgs = oflgs;
}
}
static void
wake_scheduler(ErtsRunQueue *rq, int incq)
{
ErtsSchedulerSleepInfo *ssi;
erts_aint32_t flgs;
/*
* The unlocked run queue is not strictly necessary
* from a thread safety or deadlock prevention
* perspective. It will, however, cost us performance
* if it is locked during wakup of another scheduler,
* so all code *should* handle this without having
* the lock on the run queue.
*/
ERTS_SMP_LC_ASSERT(!erts_smp_lc_runq_is_locked(rq));
ssi = rq->scheduler->ssi;
flgs = ssi_flags_set_wake(ssi);
erts_sched_finish_poke(ssi, flgs);
if (incq && (flgs & ERTS_SSI_FLG_WAITING))
non_empty_runq(rq);
}
#define ERTS_NO_USED_RUNQS_SHIFT 16
#define ERTS_NO_RUNQS_MASK 0xffff
#if ERTS_MAX_NO_OF_SCHEDULERS > ERTS_NO_RUNQS_MASK
# error "Too large amount of schedulers allowed"
#endif
static ERTS_INLINE void
init_no_runqs(int active, int used)
{
erts_aint32_t no_runqs = (erts_aint32_t) (active & ERTS_NO_RUNQS_MASK);
no_runqs |= (erts_aint32_t) ((used & ERTS_NO_RUNQS_MASK) << ERTS_NO_USED_RUNQS_SHIFT);
erts_smp_atomic32_init_nob(&balance_info.no_runqs, no_runqs);
}
static ERTS_INLINE void
get_no_runqs(int *active, int *used)
{
erts_aint32_t no_runqs = erts_smp_atomic32_read_nob(&balance_info.no_runqs);
if (active)
*active = (int) (no_runqs & ERTS_NO_RUNQS_MASK);
if (used)
*used = (int) ((no_runqs >> ERTS_NO_USED_RUNQS_SHIFT) & ERTS_NO_RUNQS_MASK);
}
static ERTS_INLINE void
set_no_used_runqs(int used)
{
erts_aint32_t exp = erts_smp_atomic32_read_nob(&balance_info.no_runqs);
while (1) {
erts_aint32_t act, new;
new = (used & ERTS_NO_RUNQS_MASK) << ERTS_NO_USED_RUNQS_SHIFT;
new |= exp & ERTS_NO_RUNQS_MASK;
act = erts_smp_atomic32_cmpxchg_nob(&balance_info.no_runqs, new, exp);
if (act == exp)
break;
exp = act;
}
}
static ERTS_INLINE void
set_no_active_runqs(int active)
{
erts_aint32_t exp = erts_smp_atomic32_read_nob(&balance_info.no_runqs);
while (1) {
erts_aint32_t act, new;
new = exp & (ERTS_NO_RUNQS_MASK << ERTS_NO_USED_RUNQS_SHIFT);
new |= active & ERTS_NO_RUNQS_MASK;
act = erts_smp_atomic32_cmpxchg_nob(&balance_info.no_runqs, new, exp);
if (act == exp)
break;
exp = act;
}
}
static ERTS_INLINE int
try_inc_no_active_runqs(int active)
{
erts_aint32_t exp = erts_smp_atomic32_read_nob(&balance_info.no_runqs);
if (((exp >> ERTS_NO_USED_RUNQS_SHIFT) & ERTS_NO_RUNQS_MASK) < active)
return 0;
if ((exp & ERTS_NO_RUNQS_MASK) + 1 == active) {
erts_aint32_t new, act;
new = exp & (ERTS_NO_RUNQS_MASK << ERTS_NO_USED_RUNQS_SHIFT);
new |= active & ERTS_NO_RUNQS_MASK;
act = erts_smp_atomic32_cmpxchg_nob(&balance_info.no_runqs, new, exp);
if (act == exp)
return 1;
}
return 0;
}
static ERTS_INLINE int
chk_wake_sched(ErtsRunQueue *crq, int ix, int activate)
{
erts_aint32_t iflgs;
ErtsRunQueue *wrq;
if (crq->ix == ix)
return 0;
wrq = ERTS_RUNQ_IX(ix);
iflgs = erts_smp_atomic32_read_nob(&wrq->info_flags);
if (!(iflgs & (ERTS_RUNQ_IFLG_SUSPENDED|ERTS_RUNQ_IFLG_NONEMPTY))) {
if (activate) {
if (try_inc_no_active_runqs(ix+1)) {
erts_smp_xrunq_lock(crq, wrq);
wrq->flags &= ~ERTS_RUNQ_FLG_INACTIVE;
erts_smp_xrunq_unlock(crq, wrq);
}
}
wake_scheduler(wrq, 0);
return 1;
}
return 0;
}
static void
wake_scheduler_on_empty_runq(ErtsRunQueue *crq)
{
int ix = crq->ix;
int stop_ix = ix;
int active_ix, balance_ix;
get_no_runqs(&active_ix, &balance_ix);
if (active_ix > balance_ix)
active_ix = balance_ix;
if (ix >= active_ix)
stop_ix = ix = active_ix;
/* Try to wake a scheduler on an active run queue */
while (1) {
ix--;
if (ix < 0) {
if (active_ix == stop_ix)
break;
ix = active_ix - 1;
}
if (ix == stop_ix)
break;
if (chk_wake_sched(crq, ix, 0))
return;
}
if (active_ix < balance_ix) {
/* Try to activate a new run queue and wake its scheduler */
(void) chk_wake_sched(crq, active_ix, 1);
}
}
#endif /* ERTS_SMP */
static ERTS_INLINE void
smp_notify_inc_runq(ErtsRunQueue *runq)
{
#ifdef ERTS_SMP
if (runq)
wake_scheduler(runq, 1);
#endif
}
void
erts_smp_notify_inc_runq(ErtsRunQueue *runq)
{
smp_notify_inc_runq(runq);
}
void
erts_sched_notify_check_cpu_bind(void)
{
#ifdef ERTS_SMP
int ix;
for (ix = 0; ix < erts_no_run_queues; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
erts_smp_runq_lock(rq);
rq->flags |= ERTS_RUNQ_FLG_CHK_CPU_BIND;
erts_smp_runq_unlock(rq);
wake_scheduler(rq, 0);
}
#else
erts_sched_check_cpu_bind(erts_get_scheduler_data());
#endif
}
#ifdef ERTS_SMP
ErtsRunQueue *
erts_prepare_emigrate(ErtsRunQueue *c_rq, ErtsRunQueueInfo *c_rqi, int prio)
{
ASSERT(ERTS_CHK_RUNQ_FLG_EMIGRATE(c_rq->flags, prio));
ASSERT(ERTS_CHK_RUNQ_FLG_EVACUATE(c_rq->flags, prio)
|| c_rqi->len >= c_rqi->migrate.limit.this);
while (1) {
ErtsRunQueue *n_rq = c_rqi->migrate.runq;
ERTS_DBG_VERIFY_VALID_RUNQP(n_rq);
erts_smp_xrunq_lock(c_rq, n_rq);
/*
* erts_smp_xrunq_lock() may release lock on c_rq! We have
* to check that we still want to emigrate and emigrate
* to the same run queue as before.
*/
if (ERTS_CHK_RUNQ_FLG_EMIGRATE(c_rq->flags, prio)) {
Uint32 force = (ERTS_CHK_RUNQ_FLG_EVACUATE(c_rq->flags, prio)
| (c_rq->flags & ERTS_RUNQ_FLG_INACTIVE));
if (force || c_rqi->len > c_rqi->migrate.limit.this) {
ErtsRunQueueInfo *n_rqi;
/* We still want to emigrate */
if (n_rq != c_rqi->migrate.runq) {
/* Ahh... run queue changed; need to do it all over again... */
erts_smp_runq_unlock(n_rq);
continue;
}
else {
if (prio == ERTS_PORT_PRIO_LEVEL)
n_rqi = &n_rq->ports.info;
else
n_rqi = &n_rq->procs.prio_info[prio];
if (force || (n_rqi->len < c_rqi->migrate.limit.other)) {
/* emigrate ... */
return n_rq;
}
}
}
}
ASSERT(n_rq != c_rq);
erts_smp_runq_unlock(n_rq);
if (!(c_rq->flags & ERTS_RUNQ_FLG_INACTIVE)) {
/* No more emigrations to this runq */
ERTS_UNSET_RUNQ_FLG_EMIGRATE(c_rq->flags, prio);
ERTS_DBG_SET_INVALID_RUNQP(c_rqi->migrate.runq, 0x3);
}
return NULL;
}
}
static void
immigrate(ErtsRunQueue *rq)
{
int prio;
ASSERT(rq->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK);
for (prio = 0; prio < ERTS_NO_PRIO_LEVELS; prio++) {
if (ERTS_CHK_RUNQ_FLG_IMMIGRATE(rq->flags, prio)) {
ErtsRunQueueInfo *rqi = (prio == ERTS_PORT_PRIO_LEVEL
? &rq->ports.info
: &rq->procs.prio_info[prio]);
ErtsRunQueue *from_rq = rqi->migrate.runq;
int rq_locked, from_rq_locked;
ERTS_DBG_VERIFY_VALID_RUNQP(from_rq);
rq_locked = 1;
from_rq_locked = 1;
erts_smp_xrunq_lock(rq, from_rq);
/*
* erts_smp_xrunq_lock() may release lock on rq! We have
* to check that we still want to immigrate from the same
* run queue as before.
*/
if (ERTS_CHK_RUNQ_FLG_IMMIGRATE(rq->flags, prio)
&& from_rq == rqi->migrate.runq) {
ErtsRunQueueInfo *from_rqi = (prio == ERTS_PORT_PRIO_LEVEL
? &from_rq->ports.info
: &from_rq->procs.prio_info[prio]);
if ((ERTS_CHK_RUNQ_FLG_EVACUATE(rq->flags, prio)
&& ERTS_CHK_RUNQ_FLG_EVACUATE(from_rq->flags, prio)
&& from_rqi->len)
|| (from_rqi->len > rqi->migrate.limit.other
&& rqi->len < rqi->migrate.limit.this)) {
if (prio == ERTS_PORT_PRIO_LEVEL) {
Port *prt = from_rq->ports.start;
if (prt) {
int prt_locked = 0;
(void) erts_port_migrate(prt, &prt_locked,
from_rq, &from_rq_locked,
rq, &rq_locked);
if (prt_locked)
erts_smp_port_unlock(prt);
}
}
else {
Process *proc;
ErtsRunPrioQueue *from_rpq;
from_rpq = (prio == PRIORITY_LOW
? &from_rq->procs.prio[PRIORITY_NORMAL]
: &from_rq->procs.prio[prio]);
for (proc = from_rpq->first; proc; proc = proc->next)
if (proc->prio == prio && !proc->bound_runq)
break;
if (proc) {
ErtsProcLocks proc_locks = 0;
(void) erts_proc_migrate(proc, &proc_locks,
from_rq, &from_rq_locked,
rq, &rq_locked);
if (proc_locks)
erts_smp_proc_unlock(proc, proc_locks);
}
}
}
else {
ERTS_UNSET_RUNQ_FLG_IMMIGRATE(rq->flags, prio);
ERTS_DBG_SET_INVALID_RUNQP(rqi->migrate.runq, 0x1);
}
}
if (from_rq_locked)
erts_smp_runq_unlock(from_rq);
if (!rq_locked)
erts_smp_runq_lock(rq);
}
}
}
static void
evacuate_run_queue(ErtsRunQueue *evac_rq, ErtsRunQueue *rq)
{
Port *prt;
int notify_to_rq = 0;
int prio;
int prt_locked = 0;
int rq_locked = 0;
int evac_rq_locked = 1;
ErtsMigrateResult mres;
erts_smp_runq_lock(evac_rq);
erts_smp_atomic32_read_bor_nob(&evac_rq->scheduler->ssi->flags,
ERTS_SSI_FLG_SUSPENDED);
evac_rq->flags &= ~ERTS_RUNQ_FLGS_IMMIGRATE_QMASK;
evac_rq->flags |= (ERTS_RUNQ_FLGS_EMIGRATE_QMASK
| ERTS_RUNQ_FLGS_EVACUATE_QMASK
| ERTS_RUNQ_FLG_SUSPENDED);
erts_smp_atomic32_read_bor_nob(&evac_rq->info_flags, ERTS_RUNQ_IFLG_SUSPENDED);
/*
* Need to set up evacuation paths first since we
* may release the run queue lock on evac_rq
* when evacuating.
*/
evac_rq->misc.evac_runq = rq;
evac_rq->ports.info.migrate.runq = rq;
for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++)
evac_rq->procs.prio_info[prio].migrate.runq = rq;
/* Evacuate scheduled misc ops */
if (evac_rq->misc.start) {
rq_locked = 1;
erts_smp_xrunq_lock(evac_rq, rq);
if (rq->misc.end)
rq->misc.end->next = evac_rq->misc.start;
else
rq->misc.start = evac_rq->misc.start;
rq->misc.end = evac_rq->misc.end;
evac_rq->misc.start = NULL;
evac_rq->misc.end = NULL;
}
/* Evacuate scheduled ports */
prt = evac_rq->ports.start;
while (prt) {
mres = erts_port_migrate(prt, &prt_locked,
evac_rq, &evac_rq_locked,
rq, &rq_locked);
if (mres == ERTS_MIGRATE_SUCCESS)
notify_to_rq = 1;
if (prt_locked)
erts_smp_port_unlock(prt);
if (!evac_rq_locked) {
evac_rq_locked = 1;
erts_smp_runq_lock(evac_rq);
}
prt = evac_rq->ports.start;
}
/* Evacuate scheduled processes */
for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++) {
Process *proc;
switch (prio) {
case PRIORITY_MAX:
case PRIORITY_HIGH:
case PRIORITY_NORMAL:
proc = evac_rq->procs.prio[prio].first;
while (proc) {
ErtsProcLocks proc_locks = 0;
/* Bound processes are stuck... */
while (proc->bound_runq) {
proc = proc->next;
if (!proc)
goto end_of_proc;
}
mres = erts_proc_migrate(proc, &proc_locks,
evac_rq, &evac_rq_locked,
rq, &rq_locked);
if (mres == ERTS_MIGRATE_SUCCESS)
notify_to_rq = 1;
if (proc_locks)
erts_smp_proc_unlock(proc, proc_locks);
if (!evac_rq_locked) {
erts_smp_runq_lock(evac_rq);
evac_rq_locked = 1;
}
proc = evac_rq->procs.prio[prio].first;
}
end_of_proc:
#ifdef DEBUG
for (proc = evac_rq->procs.prio[prio].first;
proc;
proc = proc->next) {
ASSERT(proc->bound_runq);
}
#endif
break;
case PRIORITY_LOW:
break;
default:
ASSERT(!"Invalid process priority");
break;
}
}
if (rq_locked)
erts_smp_runq_unlock(rq);
if (evac_rq_locked)
erts_smp_runq_unlock(evac_rq);
if (notify_to_rq)
smp_notify_inc_runq(rq);
wake_scheduler(evac_rq, 0);
}
static int
try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq)
{
Process *proc;
int vrq_locked;
if (*rq_lockedp)
erts_smp_xrunq_lock(rq, vrq);
else
erts_smp_runq_lock(vrq);
vrq_locked = 1;
ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp);
ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked);
if (rq->halt_in_progress)
goto try_steal_port;
/*
* Check for a runnable process to steal...
*/
switch (vrq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) {
case MAX_BIT:
case MAX_BIT|HIGH_BIT:
case MAX_BIT|NORMAL_BIT:
case MAX_BIT|LOW_BIT:
case MAX_BIT|HIGH_BIT|NORMAL_BIT:
case MAX_BIT|HIGH_BIT|LOW_BIT:
case MAX_BIT|NORMAL_BIT|LOW_BIT:
case MAX_BIT|HIGH_BIT|NORMAL_BIT|LOW_BIT:
for (proc = vrq->procs.prio[PRIORITY_MAX].last;
proc;
proc = proc->prev) {
if (!proc->bound_runq)
break;
}
if (proc)
break;
case HIGH_BIT:
case HIGH_BIT|NORMAL_BIT:
case HIGH_BIT|LOW_BIT:
case HIGH_BIT|NORMAL_BIT|LOW_BIT:
for (proc = vrq->procs.prio[PRIORITY_HIGH].last;
proc;
proc = proc->prev) {
if (!proc->bound_runq)
break;
}
if (proc)
break;
case NORMAL_BIT:
case LOW_BIT:
case NORMAL_BIT|LOW_BIT:
for (proc = vrq->procs.prio[PRIORITY_NORMAL].last;
proc;
proc = proc->prev) {
if (!proc->bound_runq)
break;
}
if (proc)
break;
case 0:
proc = NULL;
break;
default:
ASSERT(!"Invalid queue mask");
proc = NULL;
break;
}
if (proc) {
ErtsProcLocks proc_locks = 0;
int res;
ErtsMigrateResult mres;
mres = erts_proc_migrate(proc, &proc_locks,
vrq, &vrq_locked,
rq, rq_lockedp);
if (proc_locks)
erts_smp_proc_unlock(proc, proc_locks);
res = !0;
switch (mres) {
case ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED:
res = 0;
case ERTS_MIGRATE_SUCCESS:
if (vrq_locked)
erts_smp_runq_unlock(vrq);
return res;
default: /* Other failures */
break;
}
}
ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp);
ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked);
if (!vrq_locked) {
if (*rq_lockedp)
erts_smp_xrunq_lock(rq, vrq);
else
erts_smp_runq_lock(vrq);
vrq_locked = 1;
}
try_steal_port:
ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp);
ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked);
/*
* Check for a runnable port to steal...
*/
if (vrq->ports.info.len) {
Port *prt = vrq->ports.end;
int prt_locked = 0;
int res;
ErtsMigrateResult mres;
mres = erts_port_migrate(prt, &prt_locked,
vrq, &vrq_locked,
rq, rq_lockedp);
if (prt_locked)
erts_smp_port_unlock(prt);
res = !0;
switch (mres) {
case ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED:
res = 0;
case ERTS_MIGRATE_SUCCESS:
if (vrq_locked)
erts_smp_runq_unlock(vrq);
return res;
default: /* Other failures */
break;
}
}
if (vrq_locked)
erts_smp_runq_unlock(vrq);
return 0;
}
static ERTS_INLINE int
check_possible_steal_victim(ErtsRunQueue *rq, int *rq_lockedp, int vix)
{
ErtsRunQueue *vrq = ERTS_RUNQ_IX(vix);
erts_aint32_t iflgs = erts_smp_atomic32_read_nob(&vrq->info_flags);
if (iflgs & ERTS_RUNQ_IFLG_NONEMPTY)
return try_steal_task_from_victim(rq, rq_lockedp, vrq);
else
return 0;
}
static int
try_steal_task(ErtsRunQueue *rq)
{
int res, rq_locked, vix, active_rqs, blnc_rqs;
/*
* We are not allowed to steal jobs to this run queue
* if it is suspended. Note that it might get suspended
* at any time when we don't have the lock on the run
* queue.
*/
if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED)
return 0;
res = 0;
rq_locked = 1;
ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, rq_locked);
get_no_runqs(&active_rqs, &blnc_rqs);
if (active_rqs > blnc_rqs)
active_rqs = blnc_rqs;
if (rq->ix < active_rqs) {
/* First try to steal from an inactive run queue... */
if (active_rqs < blnc_rqs) {
int no = blnc_rqs - active_rqs;
int stop_ix = vix = active_rqs + rq->ix % no;
while (erts_smp_atomic32_read_acqb(&no_empty_run_queues) < blnc_rqs) {
res = check_possible_steal_victim(rq, &rq_locked, vix);
if (res)
goto done;
vix++;
if (vix >= blnc_rqs)
vix = active_rqs;
if (vix == stop_ix)
break;
}
}
vix = rq->ix;
/* ... then try to steal a job from another active queue... */
while (erts_smp_atomic32_read_acqb(&no_empty_run_queues) < blnc_rqs) {
vix++;
if (vix >= active_rqs)
vix = 0;
if (vix == rq->ix)
break;
res = check_possible_steal_victim(rq, &rq_locked, vix);
if (res)
goto done;
}
}
done:
if (!rq_locked)
erts_smp_runq_lock(rq);
if (!res)
res = rq->halt_in_progress ?
!ERTS_EMPTY_RUNQ_PORTS(rq) : !ERTS_EMPTY_RUNQ(rq);
return res;
}
/* Run queue balancing */
typedef struct {
Uint32 flags;
struct {
int max_len;
int avail;
int reds;
int migration_limit;
int emigrate_to;
int immigrate_from;
} prio[ERTS_NO_PRIO_LEVELS];
int reds;
int full_reds;
int full_reds_history_sum;
int full_reds_history_change;
int oowc;
int max_len;
} ErtsRunQueueBalance;
static ErtsRunQueueBalance *run_queue_info;
typedef struct {
int qix;
int len;
} ErtsRunQueueCompare;
static ErtsRunQueueCompare *run_queue_compare;
static int
rqc_len_cmp(const void *x, const void *y)
{
return ((ErtsRunQueueCompare *) x)->len - ((ErtsRunQueueCompare *) y)->len;
}
#define ERTS_PERCENT(X, Y) \
((Y) == 0 \
? ((X) == 0 ? 100 : INT_MAX) \
: ((100*(X))/(Y)))
#define ERTS_UPDATE_FULL_REDS(QIX, LAST_REDS) \
do { \
run_queue_info[(QIX)].full_reds \
= run_queue_info[(QIX)].full_reds_history_sum; \
run_queue_info[(QIX)].full_reds += (LAST_REDS); \
run_queue_info[(QIX)].full_reds \
>>= ERTS_FULL_REDS_HISTORY_AVG_SHFT; \
run_queue_info[(QIX)].full_reds_history_sum \
-= run_queue_info[(QIX)].full_reds_history_change; \
run_queue_info[(QIX)].full_reds_history_sum += (LAST_REDS); \
run_queue_info[(QIX)].full_reds_history_change = (LAST_REDS); \
} while (0)
#define ERTS_DBG_CHK_FULL_REDS_HISTORY(RQ) \
do { \
int sum__ = 0; \
int rix__; \
for (rix__ = 0; rix__ < ERTS_FULL_REDS_HISTORY_SIZE; rix__++) \
sum__ += (RQ)->full_reds_history[rix__]; \
ASSERT(sum__ == (RQ)->full_reds_history_sum); \
} while (0);
static void
check_balance(ErtsRunQueue *c_rq)
{
#if ERTS_MAX_PROCESSES >= (1 << 27)
# error check_balance() assumes ERTS_MAX_PROCESS < (1 << 27)
#endif
ErtsRunQueueBalance avg = {0};
Sint64 scheds_reds, full_scheds_reds;
int forced, active, current_active, oowc, half_full_scheds, full_scheds,
mmax_len, blnc_no_rqs, qix, pix, freds_hist_ix;
if (erts_smp_atomic32_xchg_nob(&balance_info.checking_balance, 1)) {
c_rq->check_balance_reds = INT_MAX;
return;
}
get_no_runqs(NULL, &blnc_no_rqs);
if (blnc_no_rqs == 1) {
c_rq->check_balance_reds = INT_MAX;
erts_smp_atomic32_set_nob(&balance_info.checking_balance, 0);
return;
}
erts_smp_runq_unlock(c_rq);
if (balance_info.halftime) {
balance_info.halftime = 0;
erts_smp_atomic32_set_nob(&balance_info.checking_balance, 0);
ERTS_FOREACH_RUNQ(rq,
{
if (rq->waiting)
rq->flags |= ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK;
else
rq->flags &= ~ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK;
rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS;
});
erts_smp_runq_lock(c_rq);
return;
}
/*
* check_balance() is never called in more threads
* than one at a time, i.e., we will normally never
* get any conflicts on the balance_info.update_mtx.
* However, when blocking multi scheduling (which performance
* critical applications do *not* do) migration information
* is manipulated. Such updates of the migration information
* might clash with balancing.
*/
erts_smp_mtx_lock(&balance_info.update_mtx);
forced = balance_info.forced_check_balance;
balance_info.forced_check_balance = 0;
get_no_runqs(&current_active, &blnc_no_rqs);
if (blnc_no_rqs == 1) {
erts_smp_mtx_unlock(&balance_info.update_mtx);
erts_smp_runq_lock(c_rq);
c_rq->check_balance_reds = INT_MAX;
erts_smp_atomic32_set_nob(&balance_info.checking_balance, 0);
return;
}
freds_hist_ix = balance_info.full_reds_history_index;
balance_info.full_reds_history_index++;
if (balance_info.full_reds_history_index >= ERTS_FULL_REDS_HISTORY_SIZE)
balance_info.full_reds_history_index = 0;
/* Read balance information for all run queues */
for (qix = 0; qix < blnc_no_rqs; qix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(qix);
erts_smp_runq_lock(rq);
run_queue_info[qix].flags = rq->flags;
for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) {
run_queue_info[qix].prio[pix].max_len
= rq->procs.prio_info[pix].max_len;
run_queue_info[qix].prio[pix].reds
= rq->procs.prio_info[pix].reds;
}
run_queue_info[qix].prio[ERTS_PORT_PRIO_LEVEL].max_len
= rq->ports.info.max_len;
run_queue_info[qix].prio[ERTS_PORT_PRIO_LEVEL].reds
= rq->ports.info.reds;
run_queue_info[qix].full_reds_history_sum
= rq->full_reds_history_sum;
run_queue_info[qix].full_reds_history_change
= rq->full_reds_history[freds_hist_ix];
run_queue_info[qix].oowc = rq->out_of_work_count;
run_queue_info[qix].max_len = rq->max_len;
rq->check_balance_reds = INT_MAX;
erts_smp_runq_unlock(rq);
}
full_scheds = 0;
half_full_scheds = 0;
full_scheds_reds = 0;
scheds_reds = 0;
oowc = 0;
mmax_len = 0;
/* Calculate availability for each priority in each run queues */
for (qix = 0; qix < blnc_no_rqs; qix++) {
int treds = 0;
if (run_queue_info[qix].flags & ERTS_RUNQ_FLG_OUT_OF_WORK) {
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
run_queue_info[qix].prio[pix].avail = 100;
treds += run_queue_info[qix].prio[pix].reds;
}
if (!(run_queue_info[qix].flags & ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK))
half_full_scheds++;
ERTS_UPDATE_FULL_REDS(qix, ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED);
}
else {
ASSERT(!(run_queue_info[qix].flags & ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK));
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++)
treds += run_queue_info[qix].prio[pix].reds;
if (treds == 0) {
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++)
run_queue_info[qix].prio[pix].avail = 0;
}
else {
Sint64 xreds = 0;
Sint64 procreds = treds;
procreds -=
((Sint64)
run_queue_info[qix].prio[ERTS_PORT_PRIO_LEVEL].reds);
for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) {
Sint64 av;
if (xreds == 0)
av = 100;
else if (procreds == xreds)
av = 0;
else {
av = (100*(procreds - xreds)) / procreds;
if (av == 0)
av = 1;
}
run_queue_info[qix].prio[pix].avail = (int) av;
ASSERT(run_queue_info[qix].prio[pix].avail >= 0);
if (pix < PRIORITY_NORMAL) /* ie., max or high */
xreds += (Sint64) run_queue_info[qix].prio[pix].reds;
}
run_queue_info[qix].prio[ERTS_PORT_PRIO_LEVEL].avail = 100;
}
ERTS_UPDATE_FULL_REDS(qix, treds);
full_scheds_reds += run_queue_info[qix].full_reds;
full_scheds++;
half_full_scheds++;
}
run_queue_info[qix].reds = treds;
scheds_reds += treds;
oowc += run_queue_info[qix].oowc;
if (mmax_len < run_queue_info[qix].max_len)
mmax_len = run_queue_info[qix].max_len;
}
if (!erts_sched_compact_load)
goto all_active;
if (!forced && half_full_scheds != blnc_no_rqs) {
int min = 1;
if (min < half_full_scheds)
min = half_full_scheds;
if (full_scheds) {
active = (scheds_reds - 1)/ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED+1;
}
else {
active = balance_info.last_active_runqs - 1;
}
if (balance_info.last_active_runqs < current_active) {
ERTS_BLNCE_SAVE_RISE(current_active, mmax_len, scheds_reds);
active = current_active;
}
else if (active < balance_info.prev_rise.active_runqs) {
if (ERTS_PERCENT(mmax_len,
balance_info.prev_rise.max_len) >= 90
&& ERTS_PERCENT(scheds_reds,
balance_info.prev_rise.reds) >= 90) {
active = balance_info.prev_rise.active_runqs;
}
}
if (active < min)
active = min;
else if (active > blnc_no_rqs)
active = blnc_no_rqs;
if (active == blnc_no_rqs)
goto all_active;
for (qix = 0; qix < active; qix++) {
run_queue_info[qix].flags = 0;
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
run_queue_info[qix].prio[pix].emigrate_to = -1;
run_queue_info[qix].prio[pix].immigrate_from = -1;
run_queue_info[qix].prio[pix].migration_limit = 0;
}
}
for (qix = active; qix < blnc_no_rqs; qix++) {
run_queue_info[qix].flags = ERTS_RUNQ_FLG_INACTIVE;
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
int tix = qix % active;
ERTS_SET_RUNQ_FLG_EMIGRATE(run_queue_info[qix].flags, pix);
run_queue_info[qix].prio[pix].emigrate_to = tix;
run_queue_info[qix].prio[pix].immigrate_from = -1;
run_queue_info[qix].prio[pix].migration_limit = 0;
}
}
}
else {
if (balance_info.last_active_runqs < current_active)
ERTS_BLNCE_SAVE_RISE(current_active, mmax_len, scheds_reds);
all_active:
active = blnc_no_rqs;
for (qix = 0; qix < blnc_no_rqs; qix++) {
if (full_scheds_reds > 0) {
/* Calculate availability compared to other schedulers */
if (!(run_queue_info[qix].flags & ERTS_RUNQ_FLG_OUT_OF_WORK)) {
Sint64 tmp = ((Sint64) run_queue_info[qix].full_reds
* (Sint64) full_scheds);
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
Sint64 avail = run_queue_info[qix].prio[pix].avail;
avail = (avail*tmp)/full_scheds_reds;
ASSERT(avail >= 0);
run_queue_info[qix].prio[pix].avail = (int) avail;
}
}
}
/* Calculate average max length */
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
run_queue_info[qix].prio[pix].emigrate_to = -1;
run_queue_info[qix].prio[pix].immigrate_from = -1;
avg.prio[pix].max_len += run_queue_info[qix].prio[pix].max_len;
avg.prio[pix].avail += run_queue_info[qix].prio[pix].avail;
}
}
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
int max_len = avg.prio[pix].max_len;
if (max_len != 0) {
int avail = avg.prio[pix].avail;
if (avail != 0) {
max_len = (int) ((100*((Sint64) max_len) - 1)
/ ((Sint64) avail)) + 1;
avg.prio[pix].max_len = max_len;
ASSERT(max_len >= 0);
}
}
}
/* Calculate migration limits for all priority queues in all
run queues */
for (qix = 0; qix < blnc_no_rqs; qix++) {
run_queue_info[qix].flags = 0; /* Reset for later use... */
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
int limit;
if (avg.prio[pix].max_len == 0
|| run_queue_info[qix].prio[pix].avail == 0)
limit = 0;
else
limit = (int) (((((Sint64) avg.prio[pix].max_len)
* ((Sint64) run_queue_info[qix].prio[pix].avail))
- 1)
/ 100 + 1);
run_queue_info[qix].prio[pix].migration_limit = limit;
}
}
/* Setup migration paths for all priorities */
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
int low = 0, high = 0;
for (qix = 0; qix < blnc_no_rqs; qix++) {
int len_diff = run_queue_info[qix].prio[pix].max_len;
len_diff -= run_queue_info[qix].prio[pix].migration_limit;
#ifdef DBG_PRINT
if (pix == 2) erts_fprintf(stderr, "%d ", len_diff);
#endif
run_queue_compare[qix].qix = qix;
run_queue_compare[qix].len = len_diff;
if (len_diff != 0) {
if (len_diff < 0)
low++;
else
high++;
}
}
#ifdef DBG_PRINT
if (pix == 2) erts_fprintf(stderr, "\n");
#endif
if (low && high) {
int from_qix;
int to_qix;
int eof = 0;
int eot = 0;
int tix = 0;
int fix = blnc_no_rqs-1;
qsort(run_queue_compare,
blnc_no_rqs,
sizeof(ErtsRunQueueCompare),
rqc_len_cmp);
while (1) {
if (run_queue_compare[fix].len <= 0)
eof = 1;
if (run_queue_compare[tix].len >= 0)
eot = 1;
if (eof || eot)
break;
from_qix = run_queue_compare[fix].qix;
to_qix = run_queue_compare[tix].qix;
if (run_queue_info[from_qix].prio[pix].avail == 0) {
ERTS_SET_RUNQ_FLG_EVACUATE(run_queue_info[from_qix].flags,
pix);
ERTS_SET_RUNQ_FLG_EVACUATE(run_queue_info[to_qix].flags,
pix);
}
ERTS_SET_RUNQ_FLG_EMIGRATE(run_queue_info[from_qix].flags, pix);
ERTS_SET_RUNQ_FLG_IMMIGRATE(run_queue_info[to_qix].flags, pix);
run_queue_info[from_qix].prio[pix].emigrate_to = to_qix;
run_queue_info[to_qix].prio[pix].immigrate_from = from_qix;
tix++;
fix--;
#ifdef DBG_PRINT
if (pix == 2) erts_fprintf(stderr, "%d >--> %d\n", from_qix, to_qix);
#endif
}
if (!eot && eof) {
if (fix < blnc_no_rqs-1)
fix++;
if (run_queue_compare[fix].len > 0) {
int fix2 = -1;
while (tix < fix) {
if (run_queue_compare[tix].len >= 0)
break;
if (fix2 < fix)
fix2 = blnc_no_rqs-1;
from_qix = run_queue_compare[fix2].qix;
to_qix = run_queue_compare[tix].qix;
ASSERT(to_qix != from_qix);
if (run_queue_info[from_qix].prio[pix].avail == 0)
ERTS_SET_RUNQ_FLG_EVACUATE(run_queue_info[to_qix].flags,
pix);
ERTS_SET_RUNQ_FLG_IMMIGRATE(run_queue_info[to_qix].flags, pix);
run_queue_info[to_qix].prio[pix].immigrate_from = from_qix;
tix++;
fix2--;
#ifdef DBG_PRINT
if (pix == 2) erts_fprintf(stderr, "%d --> %d\n", from_qix, to_qix);
#endif
}
}
}
else if (!eof && eot) {
if (tix > 0)
tix--;
if (run_queue_compare[tix].len < 0) {
int tix2 = 0;
while (tix < fix) {
if (run_queue_compare[fix].len <= 0)
break;
if (tix2 > tix)
tix2 = 0;
from_qix = run_queue_compare[fix].qix;
to_qix = run_queue_compare[tix2].qix;
ASSERT(to_qix != from_qix);
if (run_queue_info[from_qix].prio[pix].avail == 0)
ERTS_SET_RUNQ_FLG_EVACUATE(run_queue_info[from_qix].flags,
pix);
ERTS_SET_RUNQ_FLG_EMIGRATE(run_queue_info[from_qix].flags, pix);
run_queue_info[from_qix].prio[pix].emigrate_to = to_qix;
fix--;
tix2++;
#ifdef DBG_PRINT
if (pix == 2) erts_fprintf(stderr, "%d >-- %d\n", from_qix, to_qix);
#endif
}
}
}
}
}
#ifdef DBG_PRINT
erts_fprintf(stderr, "--------------------------------\n");
#endif
}
balance_info.last_active_runqs = active;
set_no_active_runqs(active);
balance_info.halftime = 1;
erts_smp_atomic32_set_nob(&balance_info.checking_balance, 0);
/* Write migration paths and reset balance statistics in all queues */
for (qix = 0; qix < blnc_no_rqs; qix++) {
int mqix;
Uint32 flags;
ErtsRunQueue *rq = ERTS_RUNQ_IX(qix);
ErtsRunQueueInfo *rqi;
flags = run_queue_info[qix].flags;
erts_smp_runq_lock(rq);
flags |= (rq->flags & ~ERTS_RUNQ_FLGS_MIGRATION_INFO);
ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK));
if (rq->waiting)
flags |= ERTS_RUNQ_FLG_OUT_OF_WORK;
rq->full_reds_history_sum
= run_queue_info[qix].full_reds_history_sum;
rq->full_reds_history[freds_hist_ix]
= run_queue_info[qix].full_reds_history_change;
ERTS_DBG_CHK_FULL_REDS_HISTORY(rq);
rq->out_of_work_count = 0;
rq->flags = flags;
rq->max_len = rq->len;
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
rqi = (pix == ERTS_PORT_PRIO_LEVEL
? &rq->ports.info
: &rq->procs.prio_info[pix]);
rqi->max_len = rqi->len;
rqi->reds = 0;
if (!(ERTS_CHK_RUNQ_FLG_EMIGRATE(flags, pix)
| ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix))) {
ASSERT(run_queue_info[qix].prio[pix].immigrate_from < 0);
ASSERT(run_queue_info[qix].prio[pix].emigrate_to < 0);
#ifdef DEBUG
rqi->migrate.limit.this = -1;
rqi->migrate.limit.other = -1;
ERTS_DBG_SET_INVALID_RUNQP(rqi->migrate.runq, 0x2);
#endif
}
else if (ERTS_CHK_RUNQ_FLG_EMIGRATE(flags, pix)) {
ASSERT(!ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix));
ASSERT(run_queue_info[qix].prio[pix].immigrate_from < 0);
ASSERT(run_queue_info[qix].prio[pix].emigrate_to >= 0);
mqix = run_queue_info[qix].prio[pix].emigrate_to;
rqi->migrate.limit.this
= run_queue_info[qix].prio[pix].migration_limit;
rqi->migrate.limit.other
= run_queue_info[mqix].prio[pix].migration_limit;
rqi->migrate.runq = ERTS_RUNQ_IX(mqix);
}
else {
ASSERT(ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix));
ASSERT(run_queue_info[qix].prio[pix].emigrate_to < 0);
ASSERT(run_queue_info[qix].prio[pix].immigrate_from >= 0);
mqix = run_queue_info[qix].prio[pix].immigrate_from;
rqi->migrate.limit.this
= run_queue_info[qix].prio[pix].migration_limit;
rqi->migrate.limit.other
= run_queue_info[mqix].prio[pix].migration_limit;
rqi->migrate.runq = ERTS_RUNQ_IX(mqix);
}
}
rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS;
erts_smp_runq_unlock(rq);
}
balance_info.n++;
erts_smp_mtx_unlock(&balance_info.update_mtx);
erts_smp_runq_lock(c_rq);
}
#endif /* #ifdef ERTS_SMP */
Uint
erts_debug_nbalance(void)
{
#ifdef ERTS_SMP
Uint n;
erts_smp_mtx_lock(&balance_info.update_mtx);
n = balance_info.n;
erts_smp_mtx_unlock(&balance_info.update_mtx);
return n;
#else
return 0;
#endif
}
/* Wakeup other schedulers */
typedef enum {
ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_VERY_HIGH,
ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_HIGH,
ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_MEDIUM,
ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_LOW,
ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_VERY_LOW
} ErtsSchedWakeupOtherThreshold;
typedef enum {
ERTS_SCHED_WAKEUP_OTHER_TYPE_PROPOSAL,
ERTS_SCHED_WAKEUP_OTHER_TYPE_LEGACY
} ErtsSchedWakeupOtherType;
/* First proposal */
#define ERTS_WAKEUP_OTHER_LIMIT_VERY_HIGH (200*CONTEXT_REDS)
#define ERTS_WAKEUP_OTHER_LIMIT_HIGH (50*CONTEXT_REDS)
#define ERTS_WAKEUP_OTHER_LIMIT_MEDIUM (10*CONTEXT_REDS)
#define ERTS_WAKEUP_OTHER_LIMIT_LOW (CONTEXT_REDS)
#define ERTS_WAKEUP_OTHER_LIMIT_VERY_LOW (CONTEXT_REDS/10)
#define ERTS_WAKEUP_OTHER_DEC_SHIFT_VERY_HIGH 3
#define ERTS_WAKEUP_OTHER_DEC_SHIFT_HIGH 1
#define ERTS_WAKEUP_OTHER_DEC_SHIFT_MEDIUM 0
#define ERTS_WAKEUP_OTHER_DEC_SHIFT_LOW -2
#define ERTS_WAKEUP_OTHER_DEC_SHIFT_VERY_LOW -5
#define ERTS_WAKEUP_OTHER_DEC_SHIFT 2
#define ERTS_WAKEUP_OTHER_FIXED_INC (CONTEXT_REDS/10)
/* To be legacy */
#define ERTS_WAKEUP_OTHER_LIMIT_VERY_HIGH_LEGACY (200*CONTEXT_REDS)
#define ERTS_WAKEUP_OTHER_LIMIT_HIGH_LEGACY (50*CONTEXT_REDS)
#define ERTS_WAKEUP_OTHER_LIMIT_MEDIUM_LEGACY (10*CONTEXT_REDS)
#define ERTS_WAKEUP_OTHER_LIMIT_LOW_LEGACY (CONTEXT_REDS)
#define ERTS_WAKEUP_OTHER_LIMIT_VERY_LOW_LEGACY (CONTEXT_REDS/10)
#define ERTS_WAKEUP_OTHER_DEC_LEGACY 10
#define ERTS_WAKEUP_OTHER_FIXED_INC_LEGACY (CONTEXT_REDS/10)
#ifdef ERTS_SMP
static struct {
ErtsSchedWakeupOtherThreshold threshold;
ErtsSchedWakeupOtherType type;
int limit;
int dec_shift;
int dec_mask;
void (*check)(ErtsRunQueue *rq);
} wakeup_other;
static void
wakeup_other_check(ErtsRunQueue *rq)
{
int wo_reds = rq->wakeup_other_reds;
if (wo_reds) {
int left_len = rq->len - 1;
if (left_len < 1) {
int wo_reduce = wo_reds << wakeup_other.dec_shift;
wo_reduce &= wakeup_other.dec_mask;
rq->wakeup_other -= wo_reduce;
if (rq->wakeup_other < 0)
rq->wakeup_other = 0;
}
else {
rq->wakeup_other += (left_len*wo_reds
+ ERTS_WAKEUP_OTHER_FIXED_INC);
if (rq->wakeup_other > wakeup_other.limit) {
int empty_rqs =
erts_smp_atomic32_read_acqb(&no_empty_run_queues);
if (empty_rqs != 0)
wake_scheduler_on_empty_runq(rq);
rq->wakeup_other = 0;
}
}
rq->wakeup_other_reds = 0;
}
}
static void
wakeup_other_set_limit(void)
{
switch (wakeup_other.threshold) {
case ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_VERY_HIGH:
wakeup_other.limit = ERTS_WAKEUP_OTHER_LIMIT_VERY_HIGH;
wakeup_other.dec_shift = ERTS_WAKEUP_OTHER_DEC_SHIFT_VERY_HIGH;
break;
case ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_HIGH:
wakeup_other.limit = ERTS_WAKEUP_OTHER_LIMIT_HIGH;
wakeup_other.dec_shift = ERTS_WAKEUP_OTHER_DEC_SHIFT_HIGH;
break;
case ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_MEDIUM:
wakeup_other.limit = ERTS_WAKEUP_OTHER_LIMIT_MEDIUM;
wakeup_other.dec_shift = ERTS_WAKEUP_OTHER_DEC_SHIFT_MEDIUM;
break;
case ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_LOW:
wakeup_other.limit = ERTS_WAKEUP_OTHER_LIMIT_LOW;
wakeup_other.dec_shift = ERTS_WAKEUP_OTHER_DEC_SHIFT_LOW;
break;
case ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_VERY_LOW:
wakeup_other.limit = ERTS_WAKEUP_OTHER_LIMIT_VERY_LOW;
wakeup_other.dec_shift = ERTS_WAKEUP_OTHER_DEC_SHIFT_VERY_LOW;
break;
}
if (wakeup_other.dec_shift < 0)
wakeup_other.dec_mask = (1 << (sizeof(wakeup_other.dec_mask)*8
+ wakeup_other.dec_shift)) - 1;
else {
wakeup_other.dec_mask = 0;
wakeup_other.dec_mask = ~wakeup_other.dec_mask;
}
}
static void
wakeup_other_check_legacy(ErtsRunQueue *rq)
{
int wo_reds = rq->wakeup_other_reds;
if (wo_reds) {
if (rq->len < 2) {
rq->wakeup_other -= ERTS_WAKEUP_OTHER_DEC_LEGACY*wo_reds;
if (rq->wakeup_other < 0)
rq->wakeup_other = 0;
}
else if (rq->wakeup_other < wakeup_other.limit)
rq->wakeup_other += rq->len*wo_reds + ERTS_WAKEUP_OTHER_FIXED_INC_LEGACY;
else {
if (erts_smp_atomic32_read_acqb(&no_empty_run_queues) != 0) {
wake_scheduler_on_empty_runq(rq);
rq->wakeup_other = 0;
}
rq->wakeup_other = 0;
}
}
rq->wakeup_other_reds = 0;
}
static void
wakeup_other_set_limit_legacy(void)
{
switch (wakeup_other.threshold) {
case ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_VERY_HIGH:
wakeup_other.limit = ERTS_WAKEUP_OTHER_LIMIT_VERY_HIGH_LEGACY;
break;
case ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_HIGH:
wakeup_other.limit = ERTS_WAKEUP_OTHER_LIMIT_HIGH_LEGACY;
break;
case ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_MEDIUM:
wakeup_other.limit = ERTS_WAKEUP_OTHER_LIMIT_MEDIUM_LEGACY;
break;
case ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_LOW:
wakeup_other.limit = ERTS_WAKEUP_OTHER_LIMIT_LOW_LEGACY;
break;
case ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_VERY_LOW:
wakeup_other.limit = ERTS_WAKEUP_OTHER_LIMIT_VERY_LOW_LEGACY;
break;
}
}
static void
set_wakeup_other_data(void)
{
switch (wakeup_other.type) {
case ERTS_SCHED_WAKEUP_OTHER_TYPE_PROPOSAL:
wakeup_other.check = wakeup_other_check;
wakeup_other_set_limit();
break;
case ERTS_SCHED_WAKEUP_OTHER_TYPE_LEGACY:
wakeup_other.check = wakeup_other_check_legacy;
wakeup_other_set_limit_legacy();
break;
}
}
#endif
void
erts_early_init_scheduling(int no_schedulers)
{
aux_work_timeout_early_init(no_schedulers);
#ifdef ERTS_SMP
wakeup_other.threshold = ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_MEDIUM;
wakeup_other.type = ERTS_SCHED_WAKEUP_OTHER_TYPE_LEGACY;
#endif
sched_busy_wait.sys_schedule = ERTS_SCHED_SYS_SLEEP_SPINCOUNT_MEDIUM;
sched_busy_wait.tse = (ERTS_SCHED_SYS_SLEEP_SPINCOUNT_MEDIUM
* ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT);
sched_busy_wait.aux_work = (ERTS_SCHED_SYS_SLEEP_SPINCOUNT_MEDIUM
* ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_MEDIUM);
}
int
erts_sched_set_wakeup_other_thresold(char *str)
{
ErtsSchedWakeupOtherThreshold threshold;
if (sys_strcmp(str, "very_high") == 0)
threshold = ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_VERY_HIGH;
else if (sys_strcmp(str, "high") == 0)
threshold = ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_HIGH;
else if (sys_strcmp(str, "medium") == 0)
threshold = ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_MEDIUM;
else if (sys_strcmp(str, "low") == 0)
threshold = ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_LOW;
else if (sys_strcmp(str, "very_low") == 0)
threshold = ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_VERY_LOW;
else
return EINVAL;
#ifdef ERTS_SMP
wakeup_other.threshold = threshold;
set_wakeup_other_data();
#endif
return 0;
}
int
erts_sched_set_wakeup_other_type(char *str)
{
ErtsSchedWakeupOtherType type;
if (sys_strcmp(str, "proposal") == 0)
type = ERTS_SCHED_WAKEUP_OTHER_TYPE_PROPOSAL;
else if (sys_strcmp(str, "default") == 0)
type = ERTS_SCHED_WAKEUP_OTHER_TYPE_LEGACY;
else if (sys_strcmp(str, "legacy") == 0)
type = ERTS_SCHED_WAKEUP_OTHER_TYPE_LEGACY;
else
return EINVAL;
#ifdef ERTS_SMP
wakeup_other.type = type;
#endif
return 0;
}
int
erts_sched_set_busy_wait_threshold(char *str)
{
int sys_sched;
int aux_work_fact;
if (sys_strcmp(str, "very_long") == 0) {
sys_sched = ERTS_SCHED_SYS_SLEEP_SPINCOUNT_VERY_LONG;
aux_work_fact = ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_VERY_LONG;
}
else if (sys_strcmp(str, "long") == 0) {
sys_sched = ERTS_SCHED_SYS_SLEEP_SPINCOUNT_LONG;
aux_work_fact = ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_LONG;
}
else if (sys_strcmp(str, "medium") == 0) {
sys_sched = ERTS_SCHED_SYS_SLEEP_SPINCOUNT_MEDIUM;
aux_work_fact = ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_MEDIUM;
}
else if (sys_strcmp(str, "short") == 0) {
sys_sched = ERTS_SCHED_SYS_SLEEP_SPINCOUNT_SHORT;
aux_work_fact = ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_SHORT;
}
else if (sys_strcmp(str, "very_short") == 0) {
sys_sched = ERTS_SCHED_SYS_SLEEP_SPINCOUNT_VERY_SHORT;
aux_work_fact = ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_VERY_SHORT;
}
else if (sys_strcmp(str, "none") == 0) {
sys_sched = ERTS_SCHED_SYS_SLEEP_SPINCOUNT_NONE;
aux_work_fact = ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_NONE;
}
else {
return EINVAL;
}
sched_busy_wait.sys_schedule = sys_sched;
sched_busy_wait.tse = sys_sched*ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT;
sched_busy_wait.aux_work = sys_sched*aux_work_fact;
return 0;
}
static void
init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp, char *dawwp)
{
awdp->sched_id = esdp ? (int) esdp->no : 0;
awdp->esdp = esdp;
awdp->ssi = esdp ? esdp->ssi : NULL;
#ifdef ERTS_SMP
awdp->misc.thr_prgr = ERTS_THR_PRGR_VAL_WAITING;
awdp->dd.thr_prgr = ERTS_THR_PRGR_VAL_WAITING;
awdp->dd.completed_callback = NULL;
awdp->dd.completed_arg = NULL;
#endif
#ifdef ERTS_USE_ASYNC_READY_Q
#ifdef ERTS_SMP
awdp->async_ready.need_thr_prgr = 0;
awdp->async_ready.thr_prgr = ERTS_THR_PRGR_VAL_WAITING;
#endif
awdp->async_ready.queue = NULL;
#endif
#ifdef ERTS_SMP
awdp->delayed_wakeup.next = ERTS_DELAYED_WAKEUP_INFINITY;
if (!dawwp) {
awdp->delayed_wakeup.job = NULL;
awdp->delayed_wakeup.sched2jix = NULL;
awdp->delayed_wakeup.jix = -1;
}
else {
int i;
awdp->delayed_wakeup.job = (ErtsDelayedAuxWorkWakeupJob *) dawwp;
dawwp += sizeof(ErtsDelayedAuxWorkWakeupJob)*(erts_no_schedulers+1);
awdp->delayed_wakeup.sched2jix = (int *) dawwp;
awdp->delayed_wakeup.jix = -1;
for (i = 0; i <= erts_no_schedulers; i++)
awdp->delayed_wakeup.sched2jix[i] = -1;
}
#endif
}
void
erts_init_scheduling(int no_schedulers, int no_schedulers_online)
{
int ix, n, no_ssi;
char *daww_ptr;
#ifdef ERTS_SMP
size_t daww_sz;
#endif
init_misc_op_list_alloc();
#ifdef ERTS_SMP
set_wakeup_other_data();
#endif
ASSERT(no_schedulers_online <= no_schedulers);
ASSERT(no_schedulers_online >= 1);
ASSERT(no_schedulers >= 1);
/* Create and initialize run queues */
n = no_schedulers;
erts_aligned_run_queues =
erts_alloc_permanent_cache_aligned(ERTS_ALC_T_RUNQS,
sizeof(ErtsAlignedRunQueue) * n);
#ifdef ERTS_SMP
erts_smp_atomic32_init_nob(&no_empty_run_queues, 0);
#endif
erts_no_run_queues = n;
for (ix = 0; ix < n; ix++) {
int pix, rix;
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
rq->ix = ix;
erts_smp_atomic32_init_nob(&rq->info_flags, ERTS_RUNQ_IFLG_NONEMPTY);
/* make sure that the "extra" id correponds to the schedulers
* id if the esdp->no <-> ix+1 mapping change.
*/
erts_smp_mtx_init_x(&rq->mtx, "run_queue", make_small(ix + 1));
erts_smp_cnd_init(&rq->cnd);
rq->waiting = 0;
rq->woken = 0;
rq->flags = 0;
rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS;
rq->full_reds_history_sum = 0;
for (rix = 0; rix < ERTS_FULL_REDS_HISTORY_SIZE; rix++) {
rq->full_reds_history_sum += ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED;
rq->full_reds_history[rix] = ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED;
}
rq->out_of_work_count = 0;
rq->max_len = 0;
rq->len = 0;
rq->wakeup_other = 0;
rq->wakeup_other_reds = 0;
rq->halt_in_progress = 0;
rq->procs.len = 0;
rq->procs.pending_exiters = NULL;
rq->procs.context_switches = 0;
rq->procs.reductions = 0;
for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) {
rq->procs.prio_info[pix].len = 0;
rq->procs.prio_info[pix].max_len = 0;
rq->procs.prio_info[pix].reds = 0;
rq->procs.prio_info[pix].migrate.limit.this = 0;
rq->procs.prio_info[pix].migrate.limit.other = 0;
ERTS_DBG_SET_INVALID_RUNQP(rq->procs.prio_info[pix].migrate.runq,
0x0);
if (pix < ERTS_NO_PROC_PRIO_LEVELS - 1) {
rq->procs.prio[pix].first = NULL;
rq->procs.prio[pix].last = NULL;
}
}
rq->misc.start = NULL;
rq->misc.end = NULL;
rq->misc.evac_runq = NULL;
rq->ports.info.len = 0;
rq->ports.info.max_len = 0;
rq->ports.info.reds = 0;
rq->ports.info.migrate.limit.this = 0;
rq->ports.info.migrate.limit.other = 0;
rq->ports.info.migrate.runq = NULL;
rq->ports.start = NULL;
rq->ports.end = NULL;
}
#ifdef ERTS_SMP
if (erts_no_run_queues != 1) {
run_queue_info = erts_alloc(ERTS_ALC_T_RUNQ_BLNS,
(sizeof(ErtsRunQueueBalance)
* erts_no_run_queues));
run_queue_compare = erts_alloc(ERTS_ALC_T_RUNQ_BLNS,
(sizeof(ErtsRunQueueCompare)
* erts_no_run_queues));
}
#endif
n = (int) no_schedulers;
erts_no_schedulers = n;
/* Create and initialize scheduler sleep info */
#ifdef ERTS_SMP
no_ssi = n+1;
#else
no_ssi = 1;
#endif
aligned_sched_sleep_info =
erts_alloc_permanent_cache_aligned(
ERTS_ALC_T_SCHDLR_SLP_INFO,
no_ssi*sizeof(ErtsAlignedSchedulerSleepInfo));
for (ix = 0; ix < no_ssi; ix++) {
ErtsSchedulerSleepInfo *ssi = &aligned_sched_sleep_info[ix].ssi;
#ifdef ERTS_SMP
#if 0 /* no need to initialize these... */
ssi->next = NULL;
ssi->prev = NULL;
#endif
erts_smp_atomic32_init_nob(&ssi->flags, 0);
ssi->event = NULL; /* initialized in sched_thread_func */
#endif
erts_atomic32_init_nob(&ssi->aux_work, 0);
}
#ifdef ERTS_SMP
aligned_sched_sleep_info++;
#endif
/* Create and initialize scheduler specific data */
#ifdef ERTS_SMP
daww_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE((sizeof(ErtsDelayedAuxWorkWakeupJob)
+ sizeof(int))*(n+1));
daww_ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_SCHDLR_DATA,
daww_sz*n);
#else
daww_ptr = NULL;
#endif
erts_aligned_scheduler_data =
erts_alloc_permanent_cache_aligned(ERTS_ALC_T_SCHDLR_DATA,
n*sizeof(ErtsAlignedSchedulerData));
for (ix = 0; ix < n; ix++) {
ErtsSchedulerData *esdp = ERTS_SCHEDULER_IX(ix);
#ifdef ERTS_SMP
erts_bits_init_state(&esdp->erl_bits_state);
esdp->match_pseudo_process = NULL;
esdp->free_process = NULL;
#endif
esdp->x_reg_array =
erts_alloc_permanent_cache_aligned(ERTS_ALC_T_BEAM_REGISTER,
ERTS_X_REGS_ALLOCATED *
sizeof(Eterm));
esdp->f_reg_array =
erts_alloc_permanent_cache_aligned(ERTS_ALC_T_BEAM_REGISTER,
MAX_REG * sizeof(FloatDef));
#if !HEAP_ON_C_STACK
esdp->num_tmp_heap_used = 0;
#endif
esdp->no = (Uint) ix+1;
esdp->ssi = ERTS_SCHED_SLEEP_INFO_IX(ix);
esdp->current_process = NULL;
esdp->current_port = NULL;
esdp->virtual_reds = 0;
esdp->cpu_id = -1;
erts_init_atom_cache_map(&esdp->atom_cache_map);
esdp->run_queue = ERTS_RUNQ_IX(ix);
esdp->run_queue->scheduler = esdp;
init_aux_work_data(&esdp->aux_work_data, esdp, daww_ptr);
#ifdef ERTS_SMP
daww_ptr += daww_sz;
#endif
esdp->reductions = 0;
init_sched_wall_time(&esdp->sched_wall_time);
}
init_misc_aux_work();
#if !HALFWORD_HEAP
init_swtreq_alloc();
#endif
#ifdef ERTS_SMP
erts_atomic32_init_nob(&completed_dealloc_count, 0); /* debug only */
aux_thread_aux_work_data =
erts_alloc_permanent_cache_aligned(ERTS_ALC_T_SCHDLR_DATA,
sizeof(ErtsAuxWorkData));
erts_smp_mtx_init(&schdlr_sspnd.mtx, "schdlr_sspnd");
erts_smp_cnd_init(&schdlr_sspnd.cnd);
erts_smp_atomic32_init_nob(&schdlr_sspnd.changing, 0);
schdlr_sspnd.online = no_schedulers_online;
schdlr_sspnd.curr_online = no_schedulers;
schdlr_sspnd.msb.ongoing = 0;
erts_smp_atomic32_init_nob(&schdlr_sspnd.active, no_schedulers);
schdlr_sspnd.msb.procs = NULL;
init_no_runqs(no_schedulers, no_schedulers_online);
balance_info.last_active_runqs = no_schedulers;
erts_smp_mtx_init(&balance_info.update_mtx, "migration_info_update");
balance_info.forced_check_balance = 0;
balance_info.halftime = 1;
balance_info.full_reds_history_index = 0;
erts_smp_atomic32_init_nob(&balance_info.checking_balance, 0);
balance_info.prev_rise.active_runqs = 0;
balance_info.prev_rise.max_len = 0;
balance_info.prev_rise.reds = 0;
balance_info.n = 0;
if (no_schedulers_online < no_schedulers) {
for (ix = no_schedulers_online; ix < erts_no_run_queues; ix++)
evacuate_run_queue(ERTS_RUNQ_IX(ix),
ERTS_RUNQ_IX(ix % no_schedulers_online));
}
schdlr_sspnd.wait_curr_online = no_schedulers_online;
schdlr_sspnd.curr_online *= 2; /* Boot strapping... */
ERTS_SCHDLR_SSPND_CHNG_SET((ERTS_SCHDLR_SSPND_CHNG_ONLN
| ERTS_SCHDLR_SSPND_CHNG_WAITER), 0);
erts_smp_atomic32_init_nob(&doing_sys_schedule, 0);
init_misc_aux_work();
#else /* !ERTS_SMP */
{
ErtsSchedulerData *esdp;
esdp = ERTS_SCHEDULER_IX(0);
erts_scheduler_data = esdp;