Skip to content

Commit

Permalink
Refactor add()
Browse files Browse the repository at this point in the history
  • Loading branch information
chadicus committed Apr 27, 2017
1 parent 1c58fe9 commit 33ba815
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 56 deletions.
114 changes: 71 additions & 43 deletions src/ProcessRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,55 +77,17 @@ public function add(
) : bool {
//loop in case the update fails its optimistic concurrency check
for ($i = 0; $i < 5; ++$i) {
$this->collection->findOneAndUpdate(
['_id' => $id],
['$setOnInsert' => ['hosts' => [], 'version' => new ObjectID()]],
['upsert' => true]
);
$existing = $this->collection->findOne(
['_id' => $id],
['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]
);

$replacement = $existing;
$replacement['version'] = new ObjectID();

//clean $replacement based on their pids and expire times
foreach ($existing['hosts'] as $hostname => $pids) {
foreach ($pids as $pid => $expires) {
//our machine and not running
//the task expired
//our machine and pid is recycled (should rarely happen)
if (($hostname === $this->encodedHostName && !file_exists("/proc/{$pid}"))
|| time() > $expires
|| ($hostname === $this->encodedHostName && $pid === $this->pid)
) {
unset($replacement['hosts'][$hostname][$pid]);
}
}

if (empty($replacement['hosts'][$hostname])) {
unset($replacement['hosts'][$hostname]);
}
}

$totalPidCount = 0;
foreach ($replacement['hosts'] as $hostname => $pids) {
$totalPidCount += count($pids);
}

$this->ensureProcessDocumentExists($id);
$existing = $this->getExistingProcessDocument($id);
$replacement = $this->generateReplacementDocument($existing);
$totalPidCount = $this->getTotalPidCount($replacement);
$thisHostPids = array_key_exists($this->encodedHostName, $replacement['hosts']) ? $replacement['hosts'][$this->encodedHostName] : [];

if ($totalPidCount >= $maxGlobalProcesses || count($thisHostPids) >= $maxHostProcesses) {
return false;
}

// add our process
$expireSecs = time() + $minsBeforeExpire * 60;
//ensure expireSecs is between 0 and self::MONGO_INT32_MAX
$expireSecs = max(0, min($expireSecs, self::MONGO_INT32_MAX));

$thisHostPids[$this->pid] = $expireSecs;
$thisHostPids[$this->pid] = $this->getExpiresSeconds($minsBeforeExpire);
$replacement['hosts'][$this->encodedHostName] = $thisHostPids;

$status = $this->collection->replaceOne(
Expand All @@ -142,6 +104,72 @@ public function add(
return false;
}

private function ensureProcessDocumentExists(string $id)
{
$this->collection->findOneAndUpdate(
['_id' => $id],
['$setOnInsert' => ['hosts' => [], 'version' => new ObjectID()]],
['upsert' => true]
);
}

private function generateReplacementDocument(array $existing) : array
{
$replacement = $existing;
$replacement['version'] = new ObjectID();

//clean $replacement based on their pids and expire times
foreach ($existing['hosts'] as $hostname => $pids) {
$this->removeExpiredPids($pids, $hostname, $replacement);
if (empty($replacement['hosts'][$hostname])) {
unset($replacement['hosts'][$hostname]);
}
}

return $replacement;
}

private function getExistingProcessDocument(string $id) : array
{
return $this->collection->findOne(
['_id' => $id],
['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]
);
}

private function removeExpiredPids(array $pids, string $hostname, array &$replacement)
{
foreach ($pids as $pid => $expires) {
if ($this->canRemovePid($hostname, $pid, $expires)) {
unset($replacement['hosts'][$hostname][$pid]);
}
}
}

private function canRemovePid(string $hostname, int $pid, int $expires) : bool
{
return (($hostname === $this->encodedHostName && !file_exists("/proc/{$pid}"))
|| time() > $expires
|| ($hostname === $this->encodedHostName && $pid === $this->pid));
}

private function getTotalPidCount(array $document) : int
{
$totalPidCount = 0;
foreach ($document['hosts'] as $hostname => $pids) {
$totalPidCount += count($pids);
}

return $totalPidCount;
}

private function getExpiresSeconds(int $minsBeforeExpire) : int
{
$expireSecs = time() + $minsBeforeExpire * 60;
//ensure expireSecs is between 0 and self::MONGO_INT32_MAX
return max(0, min($expireSecs, self::MONGO_INT32_MAX));
}

/**
* Removes from process registry. Does not do anything needed for use of the add() method. Most will only use at the
* end of their script so the mongo collection is up to date.
Expand Down
26 changes: 13 additions & 13 deletions tests/ProcessRegistryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* @covers ::<private>
* @covers ::__construct
*/
final class ProcessRegistryTest extends \PHPUnit_Framework_TestCase
final class ProcessRegistryTest extends \PHPUnit\Framework\TestCase
{
/**
* Mongo collection to use in each test.
Expand Down Expand Up @@ -79,7 +79,7 @@ public function addExistingDifferentHost()
$initialVersion = new ObjectID();
$initalTask = [
'_id' => 'testId',
'hosts' => ['a host' => ['a pid' => $expireSecs]],
'hosts' => ['a host' => ['1' => $expireSecs]],
'version' => $initialVersion,
];

Expand All @@ -93,7 +93,7 @@ public function addExistingDifferentHost()
$expected = [
'_id' => 'testId',
'hosts' => [
'a host' => ['a pid' => $expireSecs],
'a host' => ['1' => $expireSecs],
HOSTNAME => [getmypid() => ProcessRegistry::MONGO_INT32_MAX],
],
'version' => $result['version'],
Expand All @@ -113,7 +113,7 @@ public function addOverMaxGlobalProcessesOnDifferentHost()
{
$initalTask = [
'_id' => 'testId',
'hosts' => ['different host' => ['a pid' => time() + 60]],
'hosts' => ['different host' => ['1' => time() + 60]],
'version' => new ObjectID(),
];

Expand Down Expand Up @@ -187,7 +187,7 @@ public function addCleaningNotRunningProcessWithoutExtra()
$initialVersion = new ObjectID();
$initalTask = [
'_id' => 'testId',
'hosts' => [HOSTNAME => ['a pid' => time() + 60]],
'hosts' => [HOSTNAME => ['9999999' => time() + 60]],
'version' => $initialVersion,
];

Expand Down Expand Up @@ -233,7 +233,7 @@ public function addCleaningNotRunningProcessWithExtra()
'hosts' => [
HOSTNAME => [
$extraPid => $expireSecs,
'a pid' => $expireSecs,
'9999999' => $expireSecs,
],
],
'version' => $initialVersion,
Expand Down Expand Up @@ -267,7 +267,7 @@ public function addCleaningExpiredProcessWithoutExtra()
$initialVersion = new ObjectID();
$initalTask = [
'_id' => 'testId',
'hosts' => ['different host' => ['a pid' => strtotime('-1 hour')]],
'hosts' => ['different host' => ['1' => strtotime('-1 hour')]],
'version' => $initialVersion,
];

Expand Down Expand Up @@ -302,8 +302,8 @@ public function addCleaningExpiredProcessWithExtra()
'_id' => 'testId',
'hosts' => [
'a host' => [
'expiring pid' => time() - 1,
'a pid' => $expireSecs,
'2' => time() - 1,
'1' => $expireSecs,
],
],
'version' => $initialVersion,
Expand All @@ -319,7 +319,7 @@ public function addCleaningExpiredProcessWithExtra()
$expected = [
'_id' => 'testId',
'hosts' => [
'a host' => ['a pid' => $expireSecs],
'a host' => ['1' => $expireSecs],
HOSTNAME => [getmypid() => ProcessRegistry::MONGO_INT32_MAX]
],
'version' => $result['version'],
Expand Down Expand Up @@ -440,7 +440,7 @@ public function removeWithExistingProcess()
$initalTask = [
'_id' => 'testId',
'hosts' => [
HOSTNAME => ['a pid' => 0, getmypid() => time() + 60],
HOSTNAME => ['1' => 0, getmypid() => time() + 60],
],
'version' => $initialVersion,
];
Expand All @@ -453,7 +453,7 @@ public function removeWithExistingProcess()
$result = $this->collection->findOne();

$this->assertSame(
['_id' => 'testId', 'hosts' => [HOSTNAME => ['a pid' => 0]], 'version' => $result['version']],
['_id' => 'testId', 'hosts' => [HOSTNAME => ['1' => 0]], 'version' => $result['version']],
$result
);
$this->assertNotSame((string)$initialVersion, (string)$result['version']);
Expand Down Expand Up @@ -630,7 +630,7 @@ public function addTooMuchConcurrency()
{
$mockUpdateResult = $this->getMockBuilder('\\MongoDB\\UpdateResult')->disableOriginalConstructor()->getMock();
$mockUpdateResult->method('getMatchedCount')->willReturn(0);
$entry = new \ArrayObject(['_id' => 'myTask', 'hosts' => []]);
$entry = ['_id' => 'myTask', 'hosts' => [], 'version' => new ObjectID()];
$mockCollection = $this->getMockBuilder('\\MongoDB\\Collection')->disableOriginalConstructor()->getMock();
$mockCollection->method('findOne')->willReturn($entry);
$mockCollection->method('replaceOne')->willReturn($mockUpdateResult);
Expand Down

0 comments on commit 33ba815

Please sign in to comment.