Skip to content

Commit

Permalink
Added heartbeat functionality to configuration and generator, making …
Browse files Browse the repository at this point in the history
…it possible to dynamically change machine ID.
  • Loading branch information
Tomasz Struczyński committed Feb 5, 2016
1 parent 9566924 commit a443d74
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 11 deletions.
8 changes: 8 additions & 0 deletions Tests/FixedConfigTest.php
Expand Up @@ -19,4 +19,12 @@ public function testCreateWithLogger()
$config = new \Gendoria\CruftFlake\Config\FixedConfig(1, new \Psr\Log\NullLogger());
$this->assertEquals(1, $config->getMachine());
}

public function testHeartbeat()
{
$config = new \Gendoria\CruftFlake\Config\FixedConfig(10);
$this->assertEquals(10, $config->getMachine());
$this->assertFalse($config->heartbeat());
$this->assertEquals(10, $config->getMachine());
}
}
36 changes: 35 additions & 1 deletion Tests/GeneratorTest.php
Expand Up @@ -49,7 +49,22 @@ private function buildSystemUnderTest32Bit()
return $generator;
}


/**
* Get generator for normal tests.
*
* @return Generator
*/
private function buildSystemUnderTestHeartbeat($newMachineId)
{
$this->config->expects($this->exactly(2))
->method('getMachine')
->will($this->onConsecutiveCalls($this->machineId, $newMachineId));
$this->config->expects($this->once())
->method('heartbeat')
->will($this->returnValue(true));
return new Generator($this->config, $this->timer);
}

private function assertId($id)
{
$this->assertTrue(is_string($id));
Expand Down Expand Up @@ -329,4 +344,23 @@ public function testLargestTimestampWithLargestEverythingElse()
$this->assertReallyNotEquals($id1, $id2);
$this->assertEquals('9223372036854771712', $id1);
}

public function testHeartbeat()
{
$this->machineId = 1;
$this->timer->expects($this->any())
->method('getUnixTimestamp')
->will($this->returnValue(1325376000000));
$cf = $this->buildSystemUnderTestHeartbeat(2);

//Test. First ID, heartbeat changing machine ID to 2, second ID
$id1 = $cf->generate();
$cf->heartbeat();
$id2 = $cf->generate();

$expectedId1 = $value = (0 << 22) | (1 << 12) | 0; //timestamp 0, machine 1, sequence 0
$expectedId2 = $value = (0 << 22) | (2 << 12) | 1; //timestamp 0, machine 2, sequence 1
$this->assertEquals($expectedId1, $id1);
$this->assertEquals($expectedId2, $id2);
}
}
8 changes: 4 additions & 4 deletions Tests/ZmqServerTest.php
Expand Up @@ -11,7 +11,7 @@ class ZmqServerTest extends PHPUnit_Framework_TestCase
{
public function testGenerate()
{
$generator = $this->getMock('\Gendoria\CruftFlake\Generator\Generator', array('generate'), array(), '', false);
$generator = $this->getMock('\Gendoria\CruftFlake\Generator\Generator', array('generate', 'heartbeat'), array(), '', false);
$generator->expects($this->once())
->method('generate')
->will($this->returnValue(10));
Expand All @@ -37,7 +37,7 @@ public function testGenerate()

public function testGenerateException()
{
$generator = $this->getMock('\Gendoria\CruftFlake\Generator\Generator', array('generate'), array(), '', false);
$generator = $this->getMock('\Gendoria\CruftFlake\Generator\Generator', array('generate', 'heartbeat'), array(), '', false);
$generator->expects($this->once())
->method('generate')
->will($this->throwException(new Exception()));
Expand All @@ -63,7 +63,7 @@ public function testGenerateException()

public function testStatus()
{
$generator = $this->getMock('\Gendoria\CruftFlake\Generator\Generator', array('status'), array(), '', false);
$generator = $this->getMock('\Gendoria\CruftFlake\Generator\Generator', array('status', 'heartbeat'), array(), '', false);
$generatorStatus = new Gendoria\CruftFlake\Generator\GeneratorStatus(1, 1, 1, true);
$generator->expects($this->once())
->method('status')
Expand All @@ -90,7 +90,7 @@ public function testStatus()

public function testUnknownCommand()
{
$generator = $this->getMock('\Gendoria\CruftFlake\Generator\Generator', array('status'), array(), '', false);
$generator = $this->getMock('\Gendoria\CruftFlake\Generator\Generator', array('status', 'heartbeat'), array(), '', false);

$socket = $this->getMock('ZMQSocket', array('recv', 'send'), array(), '', false);
$socket->expects($this->once())
Expand Down
7 changes: 7 additions & 0 deletions src/Gendoria/CruftFlake/Config/ConfigInterface.php
Expand Up @@ -17,4 +17,11 @@ interface ConfigInterface
* @return int Should be a 10-bit int (decimal 0 to 1023)
*/
public function getMachine();

/**
* Configuration heartbeat.
*
* @return bool True, if configuration data had been changed during heartbeat.
*/
public function heartbeat();
}
10 changes: 10 additions & 0 deletions src/Gendoria/CruftFlake/Config/FixedConfig.php
Expand Up @@ -59,6 +59,16 @@ public function getMachine()
return $this->machineId;
}

/**
* {@inheritdoc}
*
* This function will always return false, as fixed config does not resync machine ID.
*/
public function heartbeat()
{
return false;
}

/**
* Set logger.
*
Expand Down
13 changes: 12 additions & 1 deletion src/Gendoria/CruftFlake/Config/ZooKeeperConfig.php
Expand Up @@ -121,6 +121,16 @@ public function getMachine()
return (int) $machineId;
}

/**
* Periodically re-syncs with zookeeper, to obtain new machine ID, if necessary.
*
* {@inheritdoc}
*/
public function heartbeat()
{
return false;
}

/**
* Compare found machine information with expected values.
*
Expand Down Expand Up @@ -179,7 +189,7 @@ private function createMachineInfo(array $children, array $machineInfo)
/**
* Get mac address and hostname.
*
* @return array "hostname","processId" keys
* @return array "hostname","processId", "time" keys
*/
private function getMachineInfo()
{
Expand All @@ -191,6 +201,7 @@ private function getMachineInfo()
throw new RuntimeException('Unable to identify machine hostname');
}
$info['processId'] = $this->procesId;
$info['time'] = (int) floor(microtime(true) * 1000);

return $info;
}
Expand Down
24 changes: 24 additions & 0 deletions src/Gendoria/CruftFlake/Generator/Generator.php
Expand Up @@ -90,6 +90,13 @@ class Generator
*/
private $lastTime = null;

/**
* Config.
*
* @var ConfigInterface
*/
private $config;

/**
* Constructor.
*
Expand All @@ -98,6 +105,7 @@ class Generator
*/
public function __construct(ConfigInterface $config, TimerInterface $timer)
{
$this->config = $config;
$this->machine = $config->getMachine();
if (!is_int($this->machine) || $this->machine < 0 || $this->machine > 1023) {
throw new InvalidArgumentException(
Expand Down Expand Up @@ -194,6 +202,22 @@ public function status()
$this->sequence, (PHP_INT_SIZE === 4));
}

/**
* Perform configuration heartbeat.
*
* This refreshes the configuration and may eventually result in obtaining new machine ID.
* It may be usefull, when we want to perform garbage collection for stalled machine IDs
* in some configuration mechanisms.
*/
public function heartbeat()
{
if ($this->config->heartbeat()) {
$this->machine = $this->config->getMachine();
//Just to be sure, sleep 1 microsecond to reset sequence
usleep(1);
}
}

private function mintId32($timestamp, $machine, $sequence)
{
$hi = (int) ($timestamp / pow(2, 10));
Expand Down
10 changes: 5 additions & 5 deletions src/Gendoria/CruftFlake/Zmq/ZmqServer.php
Expand Up @@ -17,7 +17,6 @@

class ZmqServer implements ServerInterface, LoggerAwareInterface
{

/**
* Cruft flake generator.
*
Expand Down Expand Up @@ -68,9 +67,10 @@ public function run()
$receiver = $this->getZmqSocket($this->dsn);
while (true) {
$msg = $receiver->recv();
$this->logger->debug('ZMQ server received command: ' . $msg);
$this->logger->debug('ZMQ server received command: '.$msg);
$response = $this->runCommand($msg);
$receiver->send(json_encode($response));
$this->generator->heartbeat();
if ($this->debugMode) {
break;
}
Expand All @@ -85,7 +85,8 @@ private function runCommand($msg)
case 'STATUS':
return $this->commandStatus();
default:
$this->logger->debug('Unknown command received: ' . $msg);
$this->logger->debug('Unknown command received: '.$msg);

return $this->createResponse('UNKNOWN COMMAND', 404);
}
}
Expand All @@ -100,7 +101,7 @@ private function commandGenerate()
try {
$response = $this->createResponse($this->generator->generate());
} catch (Exception $e) {
$this->logger->error('Generator error: ' . $e->getMessage(), array($e, $this));
$this->logger->error('Generator error: '.$e->getMessage(), array($e, $this));
$response = $this->createResponse('ERROR', 500);
}

Expand Down Expand Up @@ -154,5 +155,4 @@ public function setLogger(LoggerInterface $logger)
{
$this->logger = $logger;
}

}

0 comments on commit a443d74

Please sign in to comment.