From 43e001e809bfab3e5066aecbe978e89ba553e762 Mon Sep 17 00:00:00 2001 From: Dmitriy Ulyanov Date: Mon, 24 Feb 2014 01:41:15 +0400 Subject: [PATCH] Added LoggerAppenderAMQP --- src/examples/php/appender_amqp.php | 22 + src/examples/resources/appender_amqp.xml | 32 + src/main/php/LoggerAutoloader.php | 1 + src/main/php/appenders/LoggerAppenderAMQP.php | 576 ++++++++++++++++++ src/site/site.xml | 1 + src/site/xdoc/docs/appenders/amqp.xml | 197 ++++++ .../php/appenders/LoggerAppenderAMQPTest.php | 281 +++++++++ 7 files changed, 1110 insertions(+) create mode 100644 src/examples/php/appender_amqp.php create mode 100644 src/examples/resources/appender_amqp.xml create mode 100644 src/main/php/appenders/LoggerAppenderAMQP.php create mode 100644 src/site/xdoc/docs/appenders/amqp.xml create mode 100644 src/test/php/appenders/LoggerAppenderAMQPTest.php diff --git a/src/examples/php/appender_amqp.php b/src/examples/php/appender_amqp.php new file mode 100644 index 00000000..c59b7266 --- /dev/null +++ b/src/examples/php/appender_amqp.php @@ -0,0 +1,22 @@ +debug("Hello World!"); diff --git a/src/examples/resources/appender_amqp.xml b/src/examples/resources/appender_amqp.xml new file mode 100644 index 00000000..39b4216e --- /dev/null +++ b/src/examples/resources/appender_amqp.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + diff --git a/src/main/php/LoggerAutoloader.php b/src/main/php/LoggerAutoloader.php index 86098c51..bd44ef55 100644 --- a/src/main/php/LoggerAutoloader.php +++ b/src/main/php/LoggerAutoloader.php @@ -54,6 +54,7 @@ class LoggerAutoloader { 'LoggerThrowableInformation' => '/LoggerThrowableInformation.php', // Appenders + 'LoggerAppenderAMQP' => '/appenders/LoggerAppenderAMQP.php', 'LoggerAppenderConsole' => '/appenders/LoggerAppenderConsole.php', 'LoggerAppenderDailyFile' => '/appenders/LoggerAppenderDailyFile.php', 'LoggerAppenderEcho' => '/appenders/LoggerAppenderEcho.php', diff --git a/src/main/php/appenders/LoggerAppenderAMQP.php b/src/main/php/appenders/LoggerAppenderAMQP.php new file mode 100644 index 00000000..2cfdb868 --- /dev/null +++ b/src/main/php/appenders/LoggerAppenderAMQP.php @@ -0,0 +1,576 @@ +processLog( + $this->layout->format($event), + $this->getFlushOnShutdown() + ); + } + + /** + * @param string $message + * @param boolean $flushOnShutdown + */ + public function processLog($message, $flushOnShutdown) + { + if ($flushOnShutdown) { + $this->stashLog($message); + } else { + $this->sendLogToAMQP($message); + } + } + + /** + * Setup AMQP connection. + * Based on defined options, this method connects to the AMQP + * and creates a {@link $AMQPConnection} and {@link $AMQPExchange}. + */ + public function activateOptions() { + try { + $connection = $this->createAMQPConnection( + $this->getHost(), + $this->getPort(), + $this->getVhost(), + $this->getLogin(), + $this->getPassword(), + $this->getConnectionTimeout() + ); + + $this->setAMQPConnection($connection); + + $exchange = $this->createAMQPExchange( + $connection, + $this->getExchangeName(), + $this->getExchangeType() + ); + + $this->setAMQPExchange($exchange); + } catch (AMQPConnectionException $e) { + $this->closed = true; + $this->warn(sprintf('Failed to connect to amqp server: %s', $e->getMessage())); + } catch (AMQPChannelException $e) { + $this->closed = true; + $this->warn(sprintf('Failed to open amqp channel: %s', $e->getMessage())); + } catch (AMQPExchangeException $e) { + $this->closed = true; + $this->warn(sprintf('Failed to declare amqp exchange: %s', $e->getMessage())); + } catch (Exception $e) { + $this->closed = true; + $this->warn(sprintf('Amqp connection exception: %s', $e->getMessage())); + } + } + + /** + * @param string $host + * @param int $port + * @param string $vhost + * @param string $login + * @param string $password + * @param float $connectionTimeout + * @return AMQPConnection + * @throws AMQPConnectionException + * @throws Exception + */ + protected function createAMQPConnection($host, $port, $vhost, $login, $password, $connectionTimeout) + { + $connection = new AMQPConnection(); + $connection->setHost($host); + $connection->setPort($port); + $connection->setVhost($vhost); + $connection->setLogin($login); + $connection->setPassword($password); + $connection->setReadTimeout($connectionTimeout); + + if (!$connection->connect()) { + throw new Exception('Cannot connect to the broker'); + } + + return $connection; + } + + /** + * @param AMQPConnection $AMQPConnection + * @param $exchangeName + * @param $exchangeType + * @return AMQPExchange + * @throws AMQPConnectionException + * @throws AMQPExchangeException + * @throws Exception + */ + protected function createAMQPExchange($AMQPConnection, $exchangeName, $exchangeType) + { + $channel = new AMQPChannel($AMQPConnection); + $exchange = new AMQPExchange($channel); + $exchange->setName($exchangeName); + $exchange->setType($exchangeType); + $exchange->setFlags(AMQP_DURABLE); + + // Since php_amqp 1.2.0: deprecate AMQPExchange::declare() in favor of AMQPExchange::declareExchange() + $declareMethodName = method_exists($exchange, 'declareExchange') ? 'declareExchange' : 'declare'; + + if (!$exchange->$declareMethodName()) { + throw new Exception('Cannot declare exchange'); + } + + return $exchange; + } + + /** + * @param AMQPConnection $AMQPConnection + */ + protected function setAMQPConnection($AMQPConnection) + { + $this->AMQPConnection = $AMQPConnection; + } + + /** + * @return AMQPConnection + */ + public function getAMQPConnection() + { + return $this->AMQPConnection; + } + + /** + * @param AMQPExchange $AMQPExchange + */ + protected function setAMQPExchange($AMQPExchange) + { + $this->AMQPExchange = $AMQPExchange; + } + + /** + * @return AMQPExchange + */ + public function getAMQPExchange() + { + return $this->AMQPExchange; + } + + /** + * @param string $AMQPRoutingKey + */ + public function setRoutingKey($AMQPRoutingKey) + { + $this->setString('routingKey', $AMQPRoutingKey); + } + + /** + * @return string + */ + public function getRoutingKey() + { + return $this->routingKey; + } + + /** + * @param string $host + */ + public function setHost($host) + { + $this->setString('host', $host); + } + + /** + * @return string + */ + public function getHost() + { + return $this->host; + } + + /** + * @param string $login + */ + public function setLogin($login) + { + $this->setString('login', $login); + } + + /** + * @return string + */ + public function getLogin() + { + return $this->login; + } + + /** + * @param string $password + */ + public function setPassword($password) + { + $this->setString('password', $password); + } + + /** + * @return string + */ + public function getPassword() + { + return $this->password; + } + + /** + * @param int $port + */ + public function setPort($port) + { + $this->setPositiveInteger('port', $port); + } + + /** + * @return int + */ + public function getPort() + { + return $this->port; + } + + /** + * @param string $vhost + */ + public function setVhost($vhost) + { + $this->setString('vhost', $vhost); + } + + /** + * @return string + */ + public function getVhost() + { + return $this->vhost; + } + + /** + * @param string $exchange + */ + public function setExchangeName($exchange) + { + $this->setString('exchangeName', $exchange); + } + + /** + * @return string + */ + public function getExchangeName() + { + return $this->exchangeName; + } + + /** + * @param string $exchangeType + */ + public function setExchangeType($exchangeType) + { + $this->setString('exchangeType', $exchangeType); + } + + /** + * @return string + */ + public function getExchangeType() + { + return $this->exchangeType; + } + + /** + * @param string $contentEncoding + */ + public function setContentEncoding($contentEncoding) + { + $this->setString('contentEncoding', $contentEncoding); + } + + /** + * @return string + */ + public function getContentEncoding() + { + return $this->contentEncoding; + } + + /** + * @param string $contentType + */ + public function setContentType($contentType) + { + $this->setString('contentType', $contentType); + } + + /** + * @return string + */ + public function getContentType() + { + return $this->contentType; + } + + /** + * @param float $connectionTimeout + */ + public function setConnectionTimeout($connectionTimeout) + { + if (is_numeric($connectionTimeout) && $connectionTimeout > 0) { + $this->connectionTimeout = floatval($connectionTimeout); + } else { + $this->warn("Invalid value given for 'connectionTimeout' property: [$connectionTimeout]. Expected a positive float. Property not changed."); + } + } + + /** + * @return float + */ + public function getConnectionTimeout() + { + return $this->connectionTimeout; + } + + /** + * @param boolean $flushOnShutdown + */ + public function setFlushOnShutdown($flushOnShutdown) + { + $this->setBoolean('flushOnShutdown', $flushOnShutdown); + } + + /** + * @return boolean + */ + public function getFlushOnShutdown() + { + return $this->flushOnShutdown; + } + + /** + * @param array $logs Array of strings + */ + public function sendLogsArrayToAMQP($logs) + { + foreach ($logs as $log) { + if ($this->closed) { + break; + } + + $this->sendLogToAMQP($log); + } + } + + /** + * @param string $log + */ + public function sendLogToAMQP($log) + { + try { + $this->getAMQPExchange()->publish( + $log, + $this->getRoutingKey(), + AMQP_NOPARAM, + array( + 'content_type' => $this->getContentType(), + 'content_encoding' => $this->getContentEncoding() + ) + ); + } catch (AMQPConnectionException $e) { + $this->closed = true; + $this->warn(sprintf('Connection to the broker was lost: %s', $e->getMessage())); + } catch (AMQPChannelException $e) { + $this->closed = true; + $this->warn(sprintf('Channel is not open: %s', $e->getMessage())); + } catch (AMQPExchangeException $e) { + $this->closed = true; + $this->warn(sprintf('Failed to publish message: %s', $e->getMessage())); + } catch (Exception $e) { + $this->warn(sprintf('Failed to publish message, unknown exception: %s', $e->getMessage())); + } + } + + /** + * @param string $log + */ + public function stashLog($log) + { + $this->logsStash[] = $log; + } + + public function cleanStashedLogs() + { + $this->logsStash = array(); + } + + public function close() + { + if ($this->getFlushOnShutdown()) { + $this->sendLogsArrayToAMQP($this->logsStash); + $this->cleanStashedLogs(); + } + + $this->setAMQPExchange(null); + $this->setAMQPConnection(null); + + parent::close(); + } +} \ No newline at end of file diff --git a/src/site/site.xml b/src/site/site.xml index a4b3d063..e06461bc 100644 --- a/src/site/site.xml +++ b/src/site/site.xml @@ -44,6 +44,7 @@ + diff --git a/src/site/xdoc/docs/appenders/amqp.xml b/src/site/xdoc/docs/appenders/amqp.xml new file mode 100644 index 00000000..31478d78 --- /dev/null +++ b/src/site/xdoc/docs/appenders/amqp.xml @@ -0,0 +1,197 @@ + + + + + + LoggerAppenderAMQP + + + +
+ +

LoggerAppenderAMQP appends log events to a AMQP instance.

+ +

The Advanced Message Queuing Protocol (AMQP) is an open standard application layer protocol + for message-oriented middleware. The defining features of AMQP are message orientation, + queuing, routing (including point-to-point and publish-and-subscribe), reliability and security.

+ + +

This appender requires a layout. If no layout is specified in configuration, + LoggerLayoutSimple will be used by default.

+
+ + +

The following parameters are available:

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ParameterTypeRequiredDefaultDescription
hoststringNolocalhostServer on which AMQP instance is located.
portintegerNo5672Port on which the instance is bound.
vhoststringNo/The name of the "virtual host".
loginstringNoguestLogin used to connect to the AMQP server.
passwordstringNoguestPassword used to connect to the AMQP server.
exchangeNamestringYes-Name of AMQP exchange which used to routing logs.
exchangeTypestringNodirectType of AMQP exchange.
routingKeystringYes-Routing key which used to routing logs.
connectionTimeoutfloatNo0.5Connection timeout in seconds.
contentTypestringNotext/plainContent-type header.
contentEncodingstringNoUTF-8Content-encoding header.
flushOnShutdownbooleanNofalseSend logs immediately or stash it and send on shutdown.
+
+ + +

This example shows how to configure LoggerAppenderAMQP to log to a remote server.

+ +
+
    +
  • XML
  • +
  • PHP
  • +
+ +
+
+

+    
+        
+        
+        
+        
+        
+        
+        
+    
+    
+        
+    
+
+]]>
+
+
+
 array(
+        'default' => array(
+            'class' => 'LoggerAppenderAMQP',
+            'params' => array(
+                'host' => 'localhost',
+                'port' => 5672,
+                'vhost' => '/',
+                'login' => 'my_login',
+                'password' => 'my_secret_password',
+                'exchangeName' => 'my_exchange',
+                'routingKey' => 'php_application',
+            ),
+        ),
+    ),
+    'rootLogger' => array(
+        'appenders' => array('default'),
+    ),
+);
+]]>
+
+
+
+
+
+ +
diff --git a/src/test/php/appenders/LoggerAppenderAMQPTest.php b/src/test/php/appenders/LoggerAppenderAMQPTest.php new file mode 100644 index 00000000..4360cccd --- /dev/null +++ b/src/test/php/appenders/LoggerAppenderAMQPTest.php @@ -0,0 +1,281 @@ + 'localhost', + 'port' => 5672, + 'login' => 'guest', + 'password' => 'guest', + 'vhost' => '/logs', + 'exchangeName' => 'logs', + 'exchangeType' => 'direct', + 'contentEncoding' => 'UTF-8', + 'contentType' => 'application/json', + 'routingKey' => 'php_website', + 'flushOnShutdown' => 0, + 'connectionTimeout' => 0.5, + ); + + public function testRequiresLayout() { + $appender = new LoggerAppenderAMQP(); + $this->assertTrue($appender->requiresLayout()); + } + + protected function setUp() { + if (extension_loaded('amqp')) { + $this->appender = $this->createAppender(); + } else { + $this->markTestSkipped( + 'The amqp extension is not available.' + ); + } + } + + protected function tearDown() { + if (extension_loaded('amqp')) { + $this->appender = null; + } + } + + public function testHost() { + $expected = $this->config['host']; + $this->appender->setHost($expected); + $result = $this->appender->getHost(); + $this->assertEquals($expected, $result); + } + + public function testPort() { + $expected = $this->config['port']; + $this->appender->setPort($expected); + $result = $this->appender->getPort(); + $this->assertEquals($expected, $result); + } + + public function testLogin() { + $expected = $this->config['login']; + $this->appender->setLogin($expected); + $result = $this->appender->getLogin(); + $this->assertEquals($expected, $result); + } + + public function testPassword() { + $expected = $this->config['password']; + $this->appender->setPassword($expected); + $result = $this->appender->getPassword(); + $this->assertEquals($expected, $result); + } + + public function testVhost() { + $expected = $this->config['vhost']; + $this->appender->setVhost($expected); + $result = $this->appender->getVhost(); + $this->assertEquals($expected, $result); + } + + public function testExchangeName() { + $expected = $this->config['exchangeName']; + $this->appender->setExchangeName($expected); + $result = $this->appender->getExchangeName(); + $this->assertEquals($expected, $result); + } + + public function testExchangeType() { + $expected = $this->config['exchangeType']; + $this->appender->setExchangeType($expected); + $result = $this->appender->getExchangeType(); + $this->assertEquals($expected, $result); + } + + public function testRoutingKey() { + $expected = $this->config['routingKey']; + $this->appender->setRoutingKey($expected); + $result = $this->appender->getRoutingKey(); + $this->assertEquals($expected, $result); + } + + public function testContentEncoding() { + $expected = $this->config['contentEncoding']; + $this->appender->setContentEncoding($expected); + $result = $this->appender->getContentEncoding(); + $this->assertEquals($expected, $result); + } + + public function testFlushOnShutdown() { + $expected = $this->config['flushOnShutdown']; + $this->appender->setFlushOnShutdown($expected); + $result = $this->appender->getFlushOnShutdown(); + $this->assertEquals($expected, $result); + } + + public function testConnectionTimeout() { + $expected = $this->config['connectionTimeout']; + $this->appender->setConnectionTimeout($expected); + $result = $this->appender->getConnectionTimeout(); + $this->assertEquals($expected, $result); + } + + public function testActivateOptions() { + $mockAppender = $this->createMockAppender(array( + 'createAMQPConnection', + 'createAMQPExchange', + 'setAMQPConnection', + 'setAMQPExchange' + )); + + $AMQPEmptyConnection = new AMQPConnection; + + $mockAppender->expects($this->once()) + ->method('createAMQPConnection') + ->will($this->returnValue($AMQPEmptyConnection)); + + $AMQPExchangeMock = $this->getMockBuilder('AMQPExchange') + ->setMethods(null) + ->disableOriginalConstructor(); + + $mockAppender->expects($this->once()) + ->method('createAMQPExchange') + ->will($this->returnValue($AMQPExchangeMock)); + + $mockAppender->expects($this->once())->method('setAMQPConnection'); + $mockAppender->expects($this->once())->method('setAMQPExchange'); + + $mockAppender->activateOptions(); + } + + /** + * @expectedException PHPUnit_Framework_Error_Warning + */ + public function testActivateOptionsWithInvalidHost() { + $appender = $this->createPreparedAppender(); + $appender->setHost('unexpected-host.i'); + $appender->activateOptions(); + } + + public function testSendLogToAMQP() + { + $expected = "Some short log message"; + $AMQPExchangeMock = new LoggerAppenderAMQP_ExchangeStub; + + $mockAppender = $this->createMockAppender(array('getAMQPExchange')); + $mockAppender->expects($this->once()) + ->method('getAMQPExchange') + ->will($this->returnValue($AMQPExchangeMock)); + + $mockAppender->sendLogToAMQP($expected); + $this->assertEquals($expected, $AMQPExchangeMock->getLastMessage()); + } + + public function testStashLog() + { + $expected = "Some log message"; + $appender = $this->createAppender(); + $appender->stashLog($expected); + + $this->assertEquals(array($expected), PHPUnit_Framework_Assert::readAttribute($appender, 'logsStash')); + } + + public function testProcessLogWithoutStash() + { + $mockAppender = $this->createMockAppender(array('sendLogToAMQP')); + $mockAppender->expects($this->once())->method('sendLogToAMQP'); + $mockAppender->processLog("Some log message", 0); + } + + public function testProcessLogWitStash() + { + $mockAppender = $this->createMockAppender(array('stashLog')); + $mockAppender->expects($this->once())->method('stashLog'); + $mockAppender->processLog("Some log message", 1); + } + + public function testSendLogsArrayToAMQP() + { + $stashedLogs = array("One stashed log"); + $mockAppender = $this->createMockAppender(array('sendLogToAMQP')); + $mockAppender->expects($this->once())->method('sendLogToAMQP'); + $mockAppender->sendLogsArrayToAMQP($stashedLogs); + } + + private function createAppender() + { + return new LoggerAppenderAMQP('amqp_appender'); + } + + private function createPreparedAppender() + { + $appender = $this->createAppender(); + + foreach ($this->config as $option => $value) { + $setter = "set$option"; + $appender->$setter($value); + } + + return $appender; + } + + /** + * @param array $methods + * @return PHPUnit_Framework_MockObject_MockObject | LoggerAppenderAMQP + */ + private function createMockAppender(array $methods = array()) + { + return $this->getMock('LoggerAppenderAMQP', $methods); + } +} + +class LoggerAppenderAMQP_ExchangeStub +{ + protected $stash = array(); + + public function publish($message, $routing_key, $flags = AMQP_NOPARAM, array $attributes = array()) + { + $this->stash[] = $message; + return true; + } + + public function cleanStash() + { + $this->stash = array(); + } + + public function getLastMessage() + { + return array_pop($this->stash); + } +} \ No newline at end of file