Skip to content

Commit

Permalink
Merge pull request #11887 from annando/maxload
Browse files Browse the repository at this point in the history
Pause the worker execution when the load is too high
  • Loading branch information
MrPetovan committed Sep 4, 2022
2 parents 8f28398 + 96ae2c8 commit 934a3a6
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 53 deletions.
27 changes: 27 additions & 0 deletions src/Core/System.php
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,33 @@ public static function currentLoad()
return max($load_arr[0], $load_arr[1]);
}

/**
* Fetch the load and number of processes
*
* @return array
*/
public static function getLoadAvg(): array
{
$content = file_get_contents('/proc/loadavg');
if (empty($content)) {
$content = shell_exec('cat /proc/loadavg');
}
if (empty($content)) {
return [];
}

if (!preg_match("#([.\d]+)\s([.\d]+)\s([.\d]+)\s(\d+)/(\d+)#", $content, $matches)) {
return [];
}
return [
'average1' => (float)$matches[1],
'average5' => (float)$matches[2],
'average15' => (float)$matches[3],
'runnable' => (float)$matches[4],
'scheduled' => (float)$matches[5]
];
}

/**
* Redirects to an external URL (fully qualified URL)
* If you want to route relative to the current Friendica base, use App->internalRedirect()
Expand Down
121 changes: 69 additions & 52 deletions src/Core/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private static function validateInclude(&$file): bool
return false;
}

$file = str_replace(getcwd() . "/", "", $file, $count);
$file = str_replace(getcwd() . '/', '', $file, $count);
if ($count != 1) {
return false;
}
Expand All @@ -301,11 +301,11 @@ private static function validateInclude(&$file): bool
}

$valid = false;
if (strpos($file, "include/") === 0) {
if (strpos($file, 'include/') === 0) {
$valid = true;
}

if (strpos($file, "addon/") === 0) {
if (strpos($file, 'addon/') === 0) {
$valid = true;
}

Expand All @@ -327,19 +327,19 @@ public static function execute(array $queue): bool

// Quit when in maintenance
if (DI::config()->get('system', 'maintenance', false, true)) {
Logger::notice("Maintenance mode - quit process", ['pid' => $mypid]);
Logger::notice('Maintenance mode - quit process', ['pid' => $mypid]);
return false;
}

// Constantly check the number of parallel database processes
if (DI::system()->isMaxProcessesReached()) {
Logger::warning("Max processes reached for process", ['pid' => $mypid]);
Logger::warning('Max processes reached for process', ['pid' => $mypid]);
return false;
}

// Constantly check the number of available database connections to let the frontend be accessible at any time
if (self::maxConnectionsReached()) {
Logger::warning("Max connection reached for process", ['pid' => $mypid]);
Logger::warning('Max connection reached for process', ['pid' => $mypid]);
return false;
}

Expand All @@ -363,7 +363,7 @@ public static function execute(array $queue): bool
if (method_exists(sprintf('Friendica\Worker\%s', $include), 'execute')) {
// We constantly update the "executed" date every minute to avoid being killed too soon
if (!isset(self::$last_update)) {
self::$last_update = strtotime($queue["executed"]);
self::$last_update = strtotime($queue['executed']);
}

$age = (time() - self::$last_update) / 60;
Expand Down Expand Up @@ -393,26 +393,26 @@ public static function execute(array $queue): bool

// The script could be provided as full path or only with the function name
if ($include == basename($include)) {
$include = "include/".$include.".php";
$include = 'include/' . $include . '.php';
}

if (!self::validateInclude($include)) {
Logger::warning("Include file is not valid", ['file' => $argv[0]]);
Logger::warning('Include file is not valid', ['file' => $argv[0]]);
$stamp = (float)microtime(true);
DBA::delete('workerqueue', ['id' => $queue["id"]]);
DBA::delete('workerqueue', ['id' => $queue['id']]);
self::$db_duration = (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
return true;
}

require_once $include;

$funcname = str_replace(".php", "", basename($argv[0]))."_run";
$funcname = str_replace('.php', '', basename($argv[0])) .'_run';

if (function_exists($funcname)) {
// We constantly update the "executed" date every minute to avoid being killed too soon
if (!isset(self::$last_update)) {
self::$last_update = strtotime($queue["executed"]);
self::$last_update = strtotime($queue['executed']);
}

$age = (time() - self::$last_update) / 60;
Expand All @@ -428,15 +428,15 @@ public static function execute(array $queue): bool
self::execFunction($queue, $funcname, $argv, false);

$stamp = (float)microtime(true);
if (DBA::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) {
if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) {
DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow());
}
self::$db_duration = (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
} else {
Logger::warning("Function does not exist", ['function' => $funcname]);
Logger::warning('Function does not exist', ['function' => $funcname]);
$stamp = (float)microtime(true);
DBA::delete('workerqueue', ['id' => $queue["id"]]);
DBA::delete('workerqueue', ['id' => $queue['id']]);
self::$db_duration = (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
}
Expand All @@ -458,15 +458,32 @@ private static function execFunction(array $queue, string $funcname, array $argv
{
$a = DI::app();

$cooldown = DI::config()->get("system", "worker_cooldown", 0);
$cooldown = DI::config()->get('system', 'worker_cooldown', 0);
if ($cooldown > 0) {
Logger::info('Pre execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]);
Logger::debug('Pre execution cooldown.', ['cooldown' => $cooldown, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
sleep($cooldown);
}

$load_cooldown = DI::config()->get('system', 'worker_load_cooldown');
$processes_cooldown = DI::config()->get('system', 'worker_processes_cooldown');

while ((($load_cooldown > 0) || ($processes_cooldown > 0)) && ($load = System::getLoadAvg())) {
if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) {
Logger::debug('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
sleep(1);
continue;
}
if (($processes_cooldown > 0) && ($load['scheduled'] > $processes_cooldown)) {
Logger::debug('Process induced pre execution cooldown.', ['max' => $processes_cooldown, 'load' => $load, 'id' => $queue['id'], 'priority' => $queue['priority'], 'command' => $queue['command']]);
sleep(1);
continue;
}
break;
}

Logger::enableWorker($funcname);

Logger::info("Process start.", ['priority' => $queue['priority'], 'id' => $queue["id"]]);
Logger::info('Process start.', ['priority' => $queue['priority'], 'id' => $queue['id']]);

$stamp = (float)microtime(true);

Expand Down Expand Up @@ -518,21 +535,21 @@ private static function execFunction(array $queue, string $funcname, array $argv
self::$lock_duration = 0;

if ($duration > 3600) {
Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
} elseif ($duration > 600) {
Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
} elseif ($duration > 300) {
Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
} elseif ($duration > 120) {
Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]);
Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration/60, 3)]);
}

Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration, 3)]);
Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'duration' => round($duration, 3)]);

DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname);
DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname);

if ($cooldown > 0) {
Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]);
Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue['id'], 'cooldown' => $cooldown]);
sleep($cooldown);
}
}
Expand All @@ -546,16 +563,16 @@ private static function execFunction(array $queue, string $funcname, array $argv
private static function maxConnectionsReached(): bool
{
// Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
$max = DI::config()->get("system", "max_connections");
$max = DI::config()->get('system', 'max_connections');

// Fetch the percentage level where the worker will get active
$maxlevel = DI::config()->get("system", "max_connections_level", 75);
$maxlevel = DI::config()->get('system', 'max_connections_level', 75);

if ($max == 0) {
// the maximum number of possible user connections can be a system variable
$r = DBA::fetchFirst("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
if (DBA::isResult($r)) {
$max = $r["Value"];
$max = $r['Value'];
}
// Or it can be granted. This overrides the system variable
$stamp = (float)microtime(true);
Expand All @@ -581,12 +598,12 @@ private static function maxConnectionsReached(): bool
$used = DBA::numRows($r);
DBA::close($r);

Logger::info("Connection usage (user values)", ['usage' => $used, 'max' => $max]);
Logger::info('Connection usage (user values)', ['usage' => $used, 'max' => $max]);

$level = ($used / $max) * 100;

if ($level >= $maxlevel) {
Logger::warning("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
Logger::warning('Maximum level (' . $maxlevel . '%) of user connections reached: ' . $used .'/' . $max);
return true;
}
}
Expand All @@ -597,26 +614,26 @@ private static function maxConnectionsReached(): bool
if (!DBA::isResult($r)) {
return false;
}
$max = intval($r["Value"]);
$max = intval($r['Value']);
if ($max == 0) {
return false;
}
$r = DBA::fetchFirst("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
if (!DBA::isResult($r)) {
return false;
}
$used = intval($r["Value"]);
$used = intval($r['Value']);
if ($used == 0) {
return false;
}
Logger::info("Connection usage (system values)", ['used' => $used, 'max' => $max]);
Logger::info('Connection usage (system values)', ['used' => $used, 'max' => $max]);

$level = $used / $max * 100;

if ($level < $maxlevel) {
return false;
}
Logger::warning("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
Logger::warning('Maximum level (' . $level . '%) of system connections reached: ' . $used . '/' . $max);
return true;
}

Expand All @@ -629,7 +646,7 @@ private static function maxConnectionsReached(): bool
*/
private static function tooMuchWorkers(): bool
{
$queues = DI::config()->get("system", "worker_queues", 10);
$queues = DI::config()->get('system', 'worker_queues', 10);

$maxqueues = $queues;

Expand All @@ -638,7 +655,7 @@ private static function tooMuchWorkers(): bool
// Decrease the number of workers at higher load
$load = System::currentLoad();
if ($load) {
$maxsysload = intval(DI::config()->get("system", "maxloadavg", 20));
$maxsysload = intval(DI::config()->get('system', 'maxloadavg', 20));

/* Default exponent 3 causes queues to rapidly decrease as load increases.
* If you have 20 max queues at idle, then you get only 5 queues at 37.1% of $maxsysload.
Expand Down Expand Up @@ -690,8 +707,8 @@ private static function tooMuchWorkers(): bool
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_stat += (microtime(true) - $stamp);
$idle_workers -= $running;
$waiting_processes += $entry["entries"];
$listitem[$entry['priority']] = $entry['priority'] . ":" . $running . "/" . $entry["entries"];
$waiting_processes += $entry['entries'];
$listitem[$entry['priority']] = $entry['priority'] . ':' . $running . '/' . $entry['entries'];
}
DBA::close($jobs);
} else {
Expand All @@ -702,33 +719,33 @@ private static function tooMuchWorkers(): bool
self::$db_duration_stat += (microtime(true) - $stamp);

while ($entry = DBA::fetch($jobs)) {
$idle_workers -= $entry["running"];
$listitem[$entry['priority']] = $entry['priority'].":".$entry["running"];
$idle_workers -= $entry['running'];
$listitem[$entry['priority']] = $entry['priority'] . ':' . $entry['running'];
}
DBA::close($jobs);
}

$waiting_processes -= $deferred;

$listitem[0] = "0:" . max(0, $idle_workers);
$listitem[0] = '0:' . max(0, $idle_workers);

$processlist .= ' ('.implode(', ', $listitem).')';

if (DI::config()->get("system", "worker_fastlane", false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) {
if (DI::config()->get('system', 'worker_fastlane', false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) {
$top_priority = self::highestPriority();
$high_running = self::processWithPriorityActive($top_priority);

if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
Logger::info("Jobs with a higher priority are waiting but none is executed. Open a fastlane.", ['priority' => $top_priority]);
Logger::info('Jobs with a higher priority are waiting but none is executed. Open a fastlane.', ['priority' => $top_priority]);
$queues = $active + 1;
}
}

Logger::notice("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $waiting_processes . $processlist . " - maximum: " . $queues . "/" . $maxqueues);
Logger::notice('Load: ' . $load . '/' . $maxsysload . ' - processes: ' . $deferred . '/' . $active . '/' . $waiting_processes . $processlist . ' - maximum: ' . $queues . '/' . $maxqueues);

// Are there fewer workers running as possible? Then fork a new one.
if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) {
Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]);
if (!DI::config()->get('system', 'worker_dont_fork', false) && ($queues > ($active + 1)) && self::entriesExists()) {
Logger::info('There are fewer workers as possible, fork a new worker.', ['active' => $active, 'queues' => $queues]);
if (Worker\Daemon::isMode()) {
Worker\IPC::SetJobState(true);
} else {
Expand Down Expand Up @@ -1117,12 +1134,12 @@ public static function spawnWorker(bool $do_cron = false)
* @param (integer|array) priority or parameter array, strings are deprecated and are ignored
*
* next args are passed as $cmd command line
* or: Worker::add(PRIORITY_HIGH, "Notifier", Delivery::DELETION, $drop_id);
* or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "Delivery", $post_id);
* or: Worker::add(PRIORITY_HIGH, 'Notifier', Delivery::DELETION, $drop_id);
* or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'Delivery', $post_id);
*
* @return int "0" if worker queue entry already existed or there had been an error, otherwise the ID of the worker task
* @return int '0' if worker queue entry already existed or there had been an error, otherwise the ID of the worker task
* @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @note $cmd and string args are surrounded with ""
* @note $cmd and string args are surrounded with ''
*
* @hooks 'proc_run'
* array $arr
Expand All @@ -1136,14 +1153,14 @@ public static function add(...$args)

$arr = ['args' => $args, 'run_cmd' => true];

Hook::callAll("proc_run", $arr);
Hook::callAll('proc_run', $arr);
if (!$arr['run_cmd'] || !count($args)) {
return 1;
}

$priority = PRIORITY_MEDIUM;
// Don't fork from frontend tasks by default
$dont_fork = DI::config()->get("system", "worker_dont_fork", false) || !DI::mode()->isBackend();
$dont_fork = DI::config()->get('system', 'worker_dont_fork', false) || !DI::mode()->isBackend();
$created = DateTimeFormat::utcNow();
$delayed = DBA::NULL_DATETIME;
$force_priority = false;
Expand Down
Loading

0 comments on commit 934a3a6

Please sign in to comment.