Skip to content

Commit

Permalink
Adaptive GC Threading
Browse files Browse the repository at this point in the history
**_For background and results see
https://github.com/eclipse/omr/issues/5829_**

> Using overhead data (busy/stall times for managing and synchronizing
threads), Adaptive Threading aims to identify sub-optimal/detrimental
parallelism and continuously adjusts the GC thread count to reach
optimal parallelism.

_Adaptive Threading changes are implemented at the various phases of GC
as follows, during:_

1. **Pre-collection _(during to task/thread dispatch)_:** to adjust
thread count based on the previous cycle’s recommendation
2. **Collection:** to gather data for parallelization overhead for the
running cycle
3. **Post-collection _(after worker threads are suspended)_:** to
project/calculate optimal thread count and give recommendation for the
next cycle based on the current GC that’s completed

Adaptive threading will be enabled by default. The user may choose to
enable/disable adaptive threading through the
`-XX:[+-]AdaptiveThreading` options. However Adaptive Threading is
ignored, even if it is enabled, when GC thread count is forced (e.g user
specifics Xgcthreads). The user can also specify upper thread limit for
adaptive threading using  `-Xgcmaxthreads` option.

_The specifics of the implementation are as follows:_

 -  Thread count associated with a parallel task instance, indicates the
projected optimal number of threads to complete the task.
 - Drives Adaptive Threading - changes with each dispatch of the task,
based on observations made when previously completing the same task
 - Currently, adaptive threading is only used with  _Scavenger_,  hence
this is only applies to tasks of type `MM_ParallelScavengeTask` so a
recommended thread count is provided with a new `ParallelScavengeTask`
instance (i.e when scavenge is run)
 -  `getRecommendedWorkingThreads` introduced to `MM_Task` base class,
base implementation returns `UDATA_MAX` (signifies no adaptive
threading), overridden  by `ParallelScavengeTask` to return adaptive
thread count.

during collection):
- ` _workerScavengeStartTime` and `_workerScavengeEndTime`
  - Thread local task (scavenge) start/end timestamps taken when worker
thread starts/competes a task (scavenge). This is used to determine the
time it takes a worker to start collection task from the time a cycle
starts and how long a worker waits for others when it is completed its
task.
- `_adjustedSyncStallTime`
  - Similar to the existing `_SyncStallTime` stat **except** it accounts
for Critical Section duration. That is, it subtracts   critical section
duration from stall time of synced thread. This is because the stall
time being added from critical section is independent of number of the
threads being synchronized. This independent stall time can not be
adjusted for, hence it must be ignored. This is relevant for
`syncAndRealeaseSingle/Master` APIs as they are the only APIs that
record a stall time with critical sections, without these,
`_SyncStallTime` == `_adjustedSyncStallTime`
  - existing `addToSyncStallTime` method extended  to update
`_adjustedSyncStallTime` in addition to `_SyncStallTime`.
`addToSyncStallTime` now takes  `criticalSectionDuration` (defaults to
0) as an input, a value for it passed when a critical section is
executed prior to updating stall stats .
 - `_notifyStallTime `
   - Used to record the stall times resulting from notifying other
waiting threads. Note, this stat is not inclusive, it only records
notify times relevant to adaptive threading.
   - We are more concerned about recording 'notify_all' rather than
'notify_one' as 'notify_all' is dependent on the number of threads being
notified.

- Introduced `calculateRecommendedWorkingThreads` routine
  - Called at the end of each successful scavenge to project optimal
threads for next scavenge
  - Implements Adaptive Model _(see background for more info)_
  - Calculates averages of stall stats and uses them as inputs to
adaptive model, projected optimal thread count output is stored to new
member `_recommendedThreads`.
  - Timestamps taken for worker thread scavenge start/end time
 - `notifyStallTime` recorded  for `notify_all` on `_scanCacheMonitor`
_(some instances are ignored as they are not relevant to adaptive
threading such as backout cases)_
- `MergeThreadGCStats` updated to merge new scavenger stats (discussed
above)
  - trace point added here to breakdown thread local stats for adaptive
threading

for Adaptive Threading
- Dispatcher now queries the task for the recommended thread count when
determining the number of threads to release from thread pool to
complete the task
  - Ensures thread count is bounded properly to respect max thread
count, either user provided adaptiveThreadCount or default max.

- `_syncCriticalSectionStartTime` & `_syncCriticalSectionDuration`
introduced to record Critical Section times for adjusted stall time
  -  `_syncCriticalSectionStartTime` recorded when all threads are
synced and the released thread is about to exit sync api to execute
critical section
  - `_syncCriticalSectionDuration` is recored when thread executing
critical section is about to release the synced thread (indicating
critical section is complete). As synced threads are released, they
update their stall time with the newly set critical section duration.
- `notifyStallTime` recorded  for `notify_all` on synced threads

Signed-off-by: Salman Rana <salman.rana@ibm.com>
  • Loading branch information
RSalman committed Mar 12, 2021
1 parent 94012c0 commit 403d231
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 12 deletions.
21 changes: 21 additions & 0 deletions gc/base/GCExtensionsBase.hpp
Expand Up @@ -517,6 +517,13 @@ class MM_GCExtensionsBase : public MM_BaseVirtual {
bool enableSplitHeap; /**< true if we are using gencon with -Xgc:splitheap (we will fail to boostrap if we can't allocate both ranges) */
double aliasInhibitingThresholdPercentage; /**< percentage of threads that can be blocked before copy cache aliasing is inhibited (set through aliasInhibitingThresholdPercentage=) */

/* Start of variables relating to Adaptive Threading */
bool adaptiveGCThreading; /**< Flag to indicate whether the Scavenger Adaptive Threading Optimization is enabled*/
float adaptiveThreadingSensitivityFactor; /**< Used by Adaptive Model to determine sensitivity/tolerance to stalling, higher number translates to less stall being tolerated (set through adaptiveThreadingSensitivityFactor=) */
float adaptiveThreadingWeightActiveThreads; /**< Weight given to current active threads when averaging projected threads with current active threads (set through adaptiveThreadingWeightActiveThreads=) */
float adaptiveThreadBooster; /**< Used to boost calculated thread count, gives opportunity for low thread count to grow. */
/* End of variables relating to Adaptive Threading */

enum HeapInitializationSplitHeapSection {
HEAP_INITIALIZATION_SPLIT_HEAP_UNKNOWN = 0,
HEAP_INITIALIZATION_SPLIT_HEAP_TENURE,
Expand Down Expand Up @@ -1093,6 +1100,16 @@ class MM_GCExtensionsBase : public MM_BaseVirtual {
MMINLINE void setConcurrentGlobalGCInProgress(bool inProgress) { _concurrentGlobalGCInProgress = inProgress; }
#endif /* OMR_GC_MODRON_SCAVENGER */

/**
* Determine whether Adaptive Threading is enabled. AdaptiveGCThreading flag
* is not sufficient; Adaptive threading must be ignored if GC thread count is forced.
* @return TRUE if adaptive threading routines can proceed, FALSE otherwise
*/
MMINLINE bool adaptiveThreadigEnabled()
{
return (adaptiveGCThreading && !gcThreadCountForced);
}

/**
* Returns TRUE if an object is old, FALSE otherwise.
* @param objectPtr Pointer to an object
Expand Down Expand Up @@ -1595,6 +1612,10 @@ class MM_GCExtensionsBase : public MM_BaseVirtual {
, dnssMinimumContraction(0.0)
, enableSplitHeap(false)
, aliasInhibitingThresholdPercentage(0.20)
, adaptiveGCThreading(true)
, adaptiveThreadingSensitivityFactor(1.0f)
, adaptiveThreadingWeightActiveThreads(0.50f)
, adaptiveThreadBooster(0.85f)
, splitHeapSection(HEAP_INITIALIZATION_SPLIT_HEAP_UNKNOWN)
#endif /* OMR_GC_MODRON_SCAVENGER */
, globalMaximumContraction(0.05) /* by default, contract must be at most 5% of the committed heap */
Expand Down
18 changes: 18 additions & 0 deletions gc/base/ParallelDispatcher.cpp
Expand Up @@ -457,6 +457,24 @@ MM_ParallelDispatcher::recomputeActiveThreadCountForTask(MM_EnvironmentBase *env
* available and ready to run).
*/
uintptr_t taskActiveThreadCount = OMR_MIN(_activeThreadCount, threadCount);

/* Account for Adaptive Threading. RecommendedWorkingThreads will not be set (will return UDATA_MAX) if:
*
* 1) User forced a thread count (e.g Xgcthreads)
* 2) Adaptive threading flag is not set (-XX:-AdaptiveGCThreading)
* 3) or simply the task wasn't recommended a thread count (currently only recommended for STW Scavenge Tasks)
*/
if (task->getRecommendedWorkingThreads() != UDATA_MAX) {
/* Bound the recommended thread count. Determine the upper bound for the thread count,
* This will either be the user specified gcMaxThreadCount (-XgcmaxthreadsN) or else default max
*/
taskActiveThreadCount = OMR_MIN(_threadCount, task->getRecommendedWorkingThreads());

_activeThreadCount = taskActiveThreadCount;

Trc_MM_ParallelDispatcher_recomputeActiveThreadCountForTask_useCollectorRecommendedThreads(task->getRecommendedWorkingThreads(), taskActiveThreadCount);
}

task->setThreadCount(taskActiveThreadCount);
return taskActiveThreadCount;
}
Expand Down
12 changes: 12 additions & 0 deletions gc/base/ParallelTask.cpp
Expand Up @@ -214,6 +214,14 @@ MM_ParallelTask::synchronizeGCThreadsAndReleaseSingleThread(MM_EnvironmentBase *
void
MM_ParallelTask::releaseSynchronizedGCThreads(MM_EnvironmentBase *env)
{
OMRPORT_ACCESS_FROM_OMRPORT(env->getPortLibrary());

if (_syncCriticalSectionStartTime != 0) {
/* Critical section complete, synced threads are about to be released. Record the duration. */
_syncCriticalSectionDuration = (omrtime_hires_clock() - _syncCriticalSectionStartTime);
_syncCriticalSectionStartTime = 0;
}

if (1 == _totalThreadCount) {
_synchronized = false;
return;
Expand All @@ -225,7 +233,11 @@ MM_ParallelTask::releaseSynchronizedGCThreads(MM_EnvironmentBase *env)
omrthread_monitor_enter(_synchronizeMutex);
_synchronizeCount = 0;
_synchronizeIndex += 1;
uint64_t notifyStartTime = omrtime_hires_clock();
omrthread_monitor_notify_all(_synchronizeMutex);

addToNotifyStallTime(env, notifyStartTime, omrtime_hires_clock());

omrthread_monitor_exit(_synchronizeMutex);
}

Expand Down
6 changes: 6 additions & 0 deletions gc/base/ParallelTask.hpp
Expand Up @@ -48,6 +48,9 @@ class MM_ParallelTask : public MM_Task
*/
private:
protected:
uint64_t _syncCriticalSectionStartTime; /**< Timestamp taken when a critical section of the task starts execution. */
uint64_t _syncCriticalSectionDuration; /**< The time, in hi-res ticks, it took to execute the lastest critical section. */

bool _synchronized;
const char *_syncPointUniqueId;
uintptr_t _syncPointWorkUnitIndex; /**< The _workUnitIndex of the first thread to sync. All threads should have the same index once sync'ed. */
Expand Down Expand Up @@ -84,6 +87,7 @@ class MM_ParallelTask : public MM_Task
_totalThreadCount = threadCount;
}
MMINLINE virtual uintptr_t getThreadCount() { return _totalThreadCount; }
MMINLINE virtual void addToNotifyStallTime(MM_EnvironmentBase *env, uint64_t startTime, uint64_t endTime) {}

virtual bool isSynchronized();

Expand All @@ -92,6 +96,8 @@ class MM_ParallelTask : public MM_Task
*/
MM_ParallelTask(MM_EnvironmentBase *env, MM_ParallelDispatcher *dispatcher) :
MM_Task(env, dispatcher)
,_syncCriticalSectionStartTime(0)
,_syncCriticalSectionDuration(0)
,_synchronized(false)
,_syncPointUniqueId(NULL)
,_syncPointWorkUnitIndex(0)
Expand Down
2 changes: 2 additions & 0 deletions gc/base/Task.hpp
Expand Up @@ -65,6 +65,8 @@ class MM_Task : public MM_BaseVirtual
virtual void run(MM_EnvironmentBase *env) = 0;
virtual void cleanup(MM_EnvironmentBase *env);

virtual uintptr_t getRecommendedWorkingThreads() { return UDATA_MAX; }

/**
* Single call setup routine for tasks invoked by the main thread before the task is dispatched.
*/
Expand Down
7 changes: 7 additions & 0 deletions gc/base/j9mm.tdf
Expand Up @@ -928,3 +928,10 @@ TraceEvent=Trc_MM_Scavenger_percolate_delegate Overhead=1 Level=1 Group=percolat

TraceEntry=Trc_MM_MemorySubSpaceUniSpace_getHeapFreeMaximumHeuristicMultiplier Overhead=1 Level=1 Group=resize Template="Trc_MM_MemorySubSpaceUniSpace_getHeapFreeMaximumHeuristicMultiplier Maximum free multiplier = %zu"
TraceEntry=Trc_MM_MemorySubSpaceUniSpace_getHeapFreeMinimumHeuristicMultiplier Overhead=1 Level=1 Group=resize Template="Trc_MM_MemorySubSpaceUniSpace_getHeapFreeMinimumHeuristicMultiplier Minimum free multiplier = %zu"

TraceEntry=Trc_MM_Scavenger_calculateRecommendedWorkingThreads_entry Overhead=1 Level=1 Group=adaptivethread Template="[%u] MM_Scavenger_calculateRecommendedWorkingThreads entry"
TraceExit=Trc_MM_Scavenger_calculateRecommendedWorkingThreads_exitOverflow Overhead=1 Level=1 Group=adaptivethread Template="Scavenge ignored for recommending threads (Irregular stalling with SyncAndReleaseMaster)"
TraceExit=Trc_MM_Scavenger_calculateRecommendedWorkingThreads_setRecommendedThreads Overhead=1 Level=1 Group=adaptivethread Template="ScavengeTime: %5llu Avg.StallTime: %5llu (%.2f%%) Threads [Current: %2u Ideal: %.2f Weighted: %.2f Boosted: %.2f Recommend: %2u]"
TraceEvent=Trc_MM_Scavenger_calculateRecommendedWorkingThreads_averageStallBreakDown Overhead=1 Level=1 Group=adaptivethread Template="Average for: %u threads -> avgTimeToStartCollection: %5llu avgTimeIdleAfterCollection: %5llu avgScanStallTime: %5llu avgSyncStallTime: %5llu avgNotifyStallTime: %5llu"
TraceEvent=Trc_MM_Scavenger_calculateRecommendedWorkingThreads_threadStallBreakDown Overhead=1 Level=1 Group=adaptivethread Template="Thread: %2u -> timeToStartCollection: %5llu scanStall: %5llu syncStall: %5llu notifyStall: %5llu"
TraceEvent=Trc_MM_ParallelDispatcher_recomputeActiveThreadCountForTask_useCollectorRecommendedThreads noEnv Overhead=1 Level=1 Group=adaptivethread Template="Attempting to use Task Recommended Threads: %u -> Adjusting to Bounds: %u"
2 changes: 1 addition & 1 deletion gc/base/standard/ConcurrentScavengeTask.hpp
Expand Up @@ -69,7 +69,7 @@ class MM_ConcurrentScavengeTask : public MM_ParallelScavengeTask
MM_Scavenger *scavenger,
ConcurrentAction action,
MM_CycleState *cycleState) :
MM_ParallelScavengeTask(env, dispatcher, scavenger, cycleState)
MM_ParallelScavengeTask(env, dispatcher, scavenger, cycleState, UDATA_MAX)
, _bytesScanned(0)
, _action(action)
{
Expand Down
26 changes: 24 additions & 2 deletions gc/base/standard/ParallelScavengeTask.cpp
Expand Up @@ -88,7 +88,15 @@ MM_ParallelScavengeTask::synchronizeGCThreadsAndReleaseMain(MM_EnvironmentBase *
bool result = MM_ParallelTask::synchronizeGCThreadsAndReleaseMain(env, id);
uint64_t endTime = omrtime_hires_clock();

env->_scavengerStats.addToSyncStallTime(startTime, endTime);
if (result) {
/* Released thread will exit and execute critical section, recored the start time */
_syncCriticalSectionStartTime = endTime;
_syncCriticalSectionDuration = 0;
}

/* _syncCriticalSectionDuration must be set now, this thread's stall time is at least the duration of critical section */
Assert_MM_true((endTime - startTime) >= _syncCriticalSectionDuration);
env->_scavengerStats.addToSyncStallTime(startTime, endTime, _syncCriticalSectionDuration);

return result;
}
Expand All @@ -102,11 +110,25 @@ MM_ParallelScavengeTask::synchronizeGCThreadsAndReleaseSingleThread(MM_Environme
bool result = MM_ParallelTask::synchronizeGCThreadsAndReleaseSingleThread(env, id);
uint64_t endTime = omrtime_hires_clock();

env->_scavengerStats.addToSyncStallTime(startTime, endTime);
if (result) {
/* Released thread will exit and execute critical section, recored the start time */
_syncCriticalSectionStartTime = endTime;
_syncCriticalSectionDuration = 0;
}

/* _syncCriticalSectionDuration must be set now, this thread's stall time is at least the duration of critical section */
Assert_MM_true((endTime - startTime) >= _syncCriticalSectionDuration);
env->_scavengerStats.addToSyncStallTime(startTime, endTime, _syncCriticalSectionDuration);

return result;
}

void
MM_ParallelScavengeTask::addToNotifyStallTime(MM_EnvironmentBase *env, uint64_t startTime, uint64_t endTime)
{
env->_scavengerStats.addToNotifyStallTime(startTime, endTime);
}

#endif /* J9MODRON_TGC_PARALLEL_STATISTICS */

#endif /* defined(OMR_GC_MODRON_SCAVENGER) */
8 changes: 7 additions & 1 deletion gc/base/standard/ParallelScavengeTask.hpp
Expand Up @@ -50,6 +50,7 @@ class MM_ParallelScavengeTask : public MM_ParallelTask
protected:
MM_Scavenger *_collector;
MM_CycleState *_cycleState; /**< Collection cycle state active for the task */
uintptr_t _recommendedThreads; /**< Collector recommended threads for the task */

public:
virtual UDATA getVMStateID() { return OMRVMSTATE_GC_SCAVENGE; };
Expand Down Expand Up @@ -77,15 +78,20 @@ class MM_ParallelScavengeTask : public MM_ParallelTask
* @see MM_ParallelTask::synchronizeGCThreadsAndReleaseSingleThread
*/
virtual bool synchronizeGCThreadsAndReleaseSingleThread(MM_EnvironmentBase *env, const char *id);

virtual void addToNotifyStallTime(MM_EnvironmentBase *env, uint64_t startTime, uint64_t endTime);
#endif /* J9MODRON_TGC_PARALLEL_STATISTICS */

virtual uintptr_t getRecommendedWorkingThreads() { return _recommendedThreads; };

/**
* Create a ParallelScavengeTask object.
*/
MM_ParallelScavengeTask(MM_EnvironmentBase *env, MM_ParallelDispatcher *dispatcher, MM_Scavenger *collector,MM_CycleState *cycleState) :
MM_ParallelScavengeTask(MM_EnvironmentBase *env, MM_ParallelDispatcher *dispatcher, MM_Scavenger *collector,MM_CycleState *cycleState, uintptr_t recommendedThreads) :
MM_ParallelTask(env, dispatcher)
,_collector(collector)
,_cycleState(cycleState)
,_recommendedThreads(recommendedThreads)
{
_typeId = __FUNCTION__;
};
Expand Down

0 comments on commit 403d231

Please sign in to comment.