Browse files

moving php stomp client to github

  • Loading branch information...
0 parents commit c7e12bf27e6d514f0aeb8dadbbc1da09b82175a5 @dejanb committed Aug 4, 2011
14 README.txt
@@ -0,0 +1,14 @@
+Installing
+
+Obtain the source of the library by downloading the distribution and add its content to your `include_path`. Alternatively, you can grab the source and add `src/main` folder to your `include_path`.
+
+Running Examples
+
+Examples are located in src/examples folder. Before running them, be sure you have installed this library properly and you have started ActiveMQ broker (recommended version 5.3.0) with Stomp connector enabled (http://activemq.apache.org/stomp.html).
+
+You can start by running
+
+ cd examples
+ php first.php
+
+Also, be sure to check comments in the particular examples for some special configuration steps (if needed).
20 build.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0"?>
+
+<project name="stomp-cli" basedir="." default="deploy">
+
+ <target name="clean" description="--> Clean output directories">
+ <delete dir="target"/>
+ </target>
+
+ <target name="deploy" depends="clean" description="--> Deploy library">
+ <mkdir dir="target"/>
+ <tar destfile="target/stomp-php-1.1-SNAPSHOT.tar.gz" compression="gzip">
+ <fileset dir="src/main/">
+ <include name="**/*"/>
+ </fileset>
+ <tarfileset dir="src/examples" prefix="examples">
+ <include name="**/*"/>
+ </tarfileset>
+ </tar>
+ </target>
+</project>
35 src/examples/activemq-connectivity.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (C) 2009 Progress Software, Inc. All rights reserved.
+ http://fusesource.com
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans>
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <broker useJmx="false" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true">
+
+ <transportConnectors>
+ <transportConnector name="stomp+ssl" uri="stomp+ssl://localhost:61612"/>
+ </transportConnectors>
+
+ <sslContext>
+ <sslContext keyStore="file:${activemq.base}/conf/broker.ks"
+ keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts"
+ trustStorePassword="password"/>
+ </sslContext>
+
+ </broker>
+
+</beans>
84 src/examples/activemq-security.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!--
+ Secure ActiveMQ broker
+ For more information, see:
+
+ http://activemq.apache.org/security.html
+
+ To run ActiveMQ with this configuration add xbean:conf/activemq-security.xml to your command
+
+ e.g. $ bin/activemq xbean:conf/activemq-security.xml
+ -->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="locations">
+ <value>file:${activemq.base}/conf/credentials.properties</value>
+ </property>
+ </bean>
+
+ <broker useJmx="true" persistent="false" xmlns="http://activemq.apache.org/schema/core">
+
+ <plugins>
+ <!-- Configure authentication; Username, passwords and groups -->
+ <simpleAuthenticationPlugin>
+ <users>
+ <authenticationUser username="system" password="manager"
+ groups="users,admins"/>
+ <authenticationUser username="user" password="password"
+ groups="users"/>
+ <authenticationUser username="guest" password="password" groups="guests"/>
+ </users>
+ </simpleAuthenticationPlugin>
+
+
+ <!-- Lets configure a destination based authorization mechanism -->
+ <authorizationPlugin>
+ <map>
+ <authorizationMap>
+ <authorizationEntries>
+ <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
+ <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
+ <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
+
+ <authorizationEntry queue="TEST.Q" read="guests" write="guests" />
+
+ <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
+ <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
+ <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
+
+ <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
+ </authorizationEntries>
+ </authorizationMap>
+ </map>
+ </authorizationPlugin>
+ </plugins>
+
+ <transportConnectors>
+ <transportConnector name="default" uri="stomp://0.0.0.0:61613"/>
+ </transportConnectors>
+
+ </broker>
+
+</beans>
49 src/examples/binary.php
@@ -0,0 +1,49 @@
+<?php
+/**
+ *
+ * Copyright (C) 2009 Progress Software, Inc. All rights reserved.
+ * http://fusesource.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// include a library
+require_once("Stomp.php");
+require_once("Stomp/Message/Bytes.php");
+// make a connection
+$con = new Stomp("tcp://localhost:61613");
+// connect
+$con->connect();
+// send a message to the queue
+$body = "test";
+$bytesMessage = new StompMessageBytes($body);
+$con->send("/queue/test", $bytesMessage);
+echo "Sending message: ";
+print_r($body . "\n");
+
+$con->subscribe("/queue/test");
+$msg = $con->readFrame();
+
+// extract
+if ( $msg != null) {
+ echo "Received message: ";
+ print_r($msg->body . "\n");
+ // mark the message as received in the queue
+ $con->ack($msg);
+} else {
+ echo "Failed to receive a message\n";
+}
+
+// disconnect
+$con->disconnect();
+?>
51 src/examples/connectivity.php
@@ -0,0 +1,51 @@
+<?php
+/**
+ *
+ * Copyright (C) 2009 Progress Software, Inc. All rights reserved.
+ * http://fusesource.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ To successfully run this example, you must first start the broker with stomp+ssl enabled.
+ You can do that by executing:
+ $ ${ACTIVEMQ_HOME}/bin/activemq xbean:activemq-connectivity.xml
+ Then you can execute this example with:
+ $ php connectivity.php
+*/
+// include a library
+require_once("Stomp.php");
+// make a connection
+$con = new Stomp("failover://(tcp://localhost:61614,ssl://localhost:61612)?randomize=false");
+// connect
+$con->connect();
+// send a message to the queue
+$con->send("/queue/test", "test");
+echo "Sent message with body 'test'\n";
+// subscribe to the queue
+$con->subscribe("/queue/test");
+// receive a message from the queue
+$msg = $con->readFrame();
+
+// do what you want with the message
+if ( $msg != null) {
+ echo "Received message with body '$msg->body'\n";
+ // mark the message as received in the queue
+ $con->ack($msg);
+} else {
+ echo "Failed to receive a message\n";
+}
+
+// disconnect
+$con->disconnect();
+?>
89 src/examples/durable.php
@@ -0,0 +1,89 @@
+<?php
+/**
+ *
+ * Copyright (C) 2009 Progress Software, Inc. All rights reserved.
+ * http://fusesource.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// include a library
+require_once("Stomp.php");
+
+// create a producer
+$producer = new Stomp("tcp://localhost:61613");
+// create a consumer
+$consumer = new Stomp("tcp://localhost:61613");
+$consumer->setReadTimeout(1);
+// set clientId on a consumer to make it durable
+$consumer->clientId = "test";
+// connect
+$producer->connect();
+$consumer->connect();
+// subscribe to the topic
+$consumer->subscribe("/topic/test");
+
+sleep(1);
+
+// send a message to the topic
+$producer->send("/topic/test", "test", array('persistent'=>'true'));
+echo "Message 'test' sent to topic\n";
+
+// receive a message from the topic
+$msg = $consumer->readFrame();
+
+// do what you want with the message
+if ( $msg != null) {
+ echo "Message '$msg->body' received from topic\n";
+ $consumer->ack($msg);
+} else {
+ echo "Failed to receive a message\n";
+}
+
+sleep(1);
+
+// disconnect durable consumer
+$consumer->unsubscribe("/topic/test");
+$consumer->disconnect();
+echo "Disconnecting consumer\n";
+
+// send a message while consumer is disconnected
+// note: only persistent messages will be redelivered to the durable consumer
+$producer->send("/topic/test", "test1", array('persistent'=>'true'));
+echo "Message 'test1' sent to topic\n";
+
+
+// reconnect the durable consumer
+$consumer = new Stomp("tcp://localhost:61613");
+$consumer->clientId = "test";
+$consumer->connect();
+$consumer->subscribe("/topic/test");
+echo "Reconnecting consumer\n";
+
+// receive a message from the topic
+$msg = $consumer->readFrame();
+
+// do what you want with the message
+if ( $msg != null) {
+ echo "Message '$msg->body' received from topic\n";
+ $consumer->ack($msg);
+} else {
+ echo "Failed to receive a message\n";
+}
+
+// disconnect
+$consumer->unsubscribe("/topic/test");
+$consumer->disconnect();
+$producer->disconnect();
+
+?>
45 src/examples/first.php
@@ -0,0 +1,45 @@
+<?php
+/**
+ *
+ * Copyright (C) 2009 Progress Software, Inc. All rights reserved.
+ * http://fusesource.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// include a library
+require_once("Stomp.php");
+// make a connection
+$con = new Stomp("tcp://localhost:61613");
+// connect
+$con->connect();
+// send a message to the queue
+$con->send("/queue/test", "test");
+echo "Sent message with body 'test'\n";
+// subscribe to the queue
+$con->subscribe("/queue/test");
+// receive a message from the queue
+$msg = $con->readFrame();
+
+// do what you want with the message
+if ( $msg != null) {
+ echo "Received message with body '$msg->body'\n";
+ // mark the message as received in the queue
+ $con->ack($msg);
+} else {
+ echo "Failed to receive a message\n";
+}
+
+// disconnect
+$con->disconnect();
+?>
65 src/examples/security.php
@@ -0,0 +1,65 @@
+<?php
+/**
+ *
+ * Copyright (C) 2009 Progress Software, Inc. All rights reserved.
+ * http://fusesource.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ To successfully run this example, you must first start the broker with security enabled.
+ You can do that by executing:
+ $ ${ACTIVEMQ_HOME}/bin/activemq xbean:activemq-security.xml
+ Then you can execute this example with:
+ $ php security.php
+*/
+// include a library
+require_once("Stomp.php");
+// make a connection
+$con = new Stomp("tcp://localhost:61613");
+// use sync operations
+$con->sync = true;
+// connect
+try {
+ $con->connect("dejan", "test");
+} catch (StompException $e) {
+ echo "dejan cannot connect\n";
+ echo $e->getMessage() . "\n";
+ echo $e->getDetails() . "\n\n\n";
+}
+
+$con->connect("guest", "password");
+
+// send a message to the queue
+try {
+ $con->send("/queue/test", "test");
+ echo "Guest sent message with body 'test'\n";
+} catch (StompException $e) {
+ echo "guest cannot send\n";
+ echo $e->getMessage() . "\n";
+ echo $e->getDetails() . "\n\n\n";
+}
+// disconnect
+$con->disconnect();
+
+
+$con->connect("system", "manager");
+
+// send a message to the queue
+$con->send("/queue/test", "test");
+echo "System manager sent message with body 'test'\n";
+
+// disconnect
+$con->disconnect();
+
+?>
96 src/examples/transactions.php
@@ -0,0 +1,96 @@
+<?php
+/**
+ *
+ * Copyright (C) 2009 Progress Software, Inc. All rights reserved.
+ * http://fusesource.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// include a library
+require_once("Stomp.php");
+// make a connection
+$con = new Stomp("tcp://localhost:61613");
+// connect
+$con->connect();
+$con->setReadTimeout(1);
+
+// subscribe to the queue
+$con->subscribe("/queue/transactions", array('ack' => 'client','activemq.prefetchSize' => 1 ));
+
+// try to send some messages
+$con->begin("tx1");
+for ($i = 1; $i < 3; $i++) {
+ $con->send("/queue/transactions", $i, array("transaction" => "tx1"));
+}
+// if we abort transaction, messages will not be sent
+$con->abort("tx1");
+
+// now send some messages for real
+$con->begin("tx2");
+echo "Sent messages {\n";
+for ($i = 1; $i < 5; $i++) {
+ echo "\t$i\n";
+ $con->send("/queue/transactions", $i, array("transaction" => "tx2"));
+}
+echo "}\n";
+// they will be available for consumers after commit
+$con->commit("tx2");
+
+// try to receive some messages
+$con->begin("tx3");
+$messages = array();
+for ($i = 1; $i < 3; $i++) {
+ $msg = $con->readFrame();
+ array_push($messages, $msg);
+ $con->ack($msg, "tx3");
+}
+// of we abort transaction, we will "rollback" out acks
+$con->abort("tx3");
+
+$con->begin("tx4");
+// so we need to ack received messages again
+// before we can receive more (prefetch = 1)
+if (count($messages) != 0) {
+ foreach($messages as $msg) {
+ $con->ack($msg, "tx4");
+ }
+}
+// now receive more messages
+for ($i = 1; $i < 3; $i++) {
+ $msg = $con->readFrame();
+ $con->ack($msg, "tx4");
+ array_push($messages, $msg);
+}
+// commit all acks
+$con->commit("tx4");
+
+
+echo "Processed messages {\n";
+foreach($messages as $msg) {
+ echo "\t$msg->body\n";
+}
+echo "}\n";
+
+//ensure there are no more messages in the queue
+$frame = $con->readFrame();
+
+if ($frame === false) {
+ echo "No more messages in the queue\n";
+} else {
+ echo "Warning: some messages still in the queue: $frame\n";
+}
+
+// disconnect
+$con->disconnect();
+?>
51 src/examples/transformation.php
@@ -0,0 +1,51 @@
+<?php
+/**
+ *
+ * Copyright (C) 2009 Progress Software, Inc. All rights reserved.
+ * http://fusesource.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// include a library
+require_once("Stomp.php");
+require_once("Stomp/Message/Map.php");
+// make a connection
+$con = new Stomp("tcp://localhost:61613");
+// connect
+$con->connect();
+// send a message to the queue
+$body = array("city"=>"Belgrade", "name"=>"Dejan");
+$header = array();
+$header['transformation'] = 'jms-map-json';
+$mapMessage = new StompMessageMap($body, $header);
+$con->send("/queue/test", $mapMessage);
+echo "Sending array: ";
+print_r($body);
+
+$con->subscribe("/queue/test", array('transformation' => 'jms-map-json'));
+$msg = $con->readFrame();
+
+// extract
+if ( $msg != null) {
+ echo "Received array: ";
+ print_r($msg->map);
+ // mark the message as received in the queue
+ $con->ack($msg);
+} else {
+ echo "Failed to receive a message\n";
+}
+
+// disconnect
+$con->disconnect();
+?>
613 src/main/Stomp.php
@@ -0,0 +1,613 @@
+<?php
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* vim: set expandtab tabstop=3 shiftwidth=3: */
+
+require_once 'Stomp/Frame.php';
+
+/**
+ * A Stomp Connection
+ *
+ *
+ * @package Stomp
+ * @author Hiram Chirino <hiram@hiramchirino.com>
+ * @author Dejan Bosanac <dejan@nighttale.net>
+ * @author Michael Caplan <mcaplan@labnet.net>
+ * @version $Revision: 43 $
+ */
+class Stomp
+{
+ /**
+ * Perform request synchronously
+ *
+ * @var boolean
+ */
+ public $sync = false;
+
+ /**
+ * Default prefetch size
+ *
+ * @var int
+ */
+ public $prefetchSize = 1;
+
+ /**
+ * Client id used for durable subscriptions
+ *
+ * @var string
+ */
+ public $clientId = null;
+
+ protected $_brokerUri = null;
+ protected $_socket = null;
+ protected $_hosts = array();
+ protected $_params = array();
+ protected $_subscriptions = array();
+ protected $_defaultPort = 61613;
+ protected $_currentHost = - 1;
+ protected $_attempts = 10;
+ protected $_username = '';
+ protected $_password = '';
+ protected $_sessionId;
+ protected $_read_timeout_seconds = 60;
+ protected $_read_timeout_milliseconds = 0;
+ protected $_connect_timeout_seconds = 60;
+
+ /**
+ * Constructor
+ *
+ * @param string $brokerUri Broker URL
+ * @throws StompException
+ */
+ public function __construct ($brokerUri)
+ {
+ $this->_brokerUri = $brokerUri;
+ $this->_init();
+ }
+ /**
+ * Initialize connection
+ *
+ * @throws StompException
+ */
+ protected function _init ()
+ {
+ $pattern = "|^(([a-zA-Z]+)://)+\(*([a-zA-Z0-9\.:/i,-]+)\)*\??([a-zA-Z0-9=&]*)$|i";
+ if (preg_match($pattern, $this->_brokerUri, $regs)) {
+ $scheme = $regs[2];
+ $hosts = $regs[3];
+ $params = $regs[4];
+ if ($scheme != "failover") {
+ $this->_processUrl($this->_brokerUri);
+ } else {
+ $urls = explode(",", $hosts);
+ foreach ($urls as $url) {
+ $this->_processUrl($url);
+ }
+ }
+ if ($params != null) {
+ parse_str($params, $this->_params);
+ }
+ } else {
+ require_once 'Stomp/Exception.php';
+ throw new StompException("Bad Broker URL {$this->_brokerUri}");
+ }
+ }
+ /**
+ * Process broker URL
+ *
+ * @param string $url Broker URL
+ * @throws StompException
+ * @return boolean
+ */
+ protected function _processUrl ($url)
+ {
+ $parsed = parse_url($url);
+ if ($parsed) {
+ array_push($this->_hosts, array($parsed['host'] , $parsed['port'] , $parsed['scheme']));
+ } else {
+ require_once 'Stomp/Exception.php';
+ throw new StompException("Bad Broker URL $url");
+ }
+ }
+ /**
+ * Make socket connection to the server
+ *
+ * @throws StompException
+ */
+ protected function _makeConnection ()
+ {
+ if (count($this->_hosts) == 0) {
+ require_once 'Stomp/Exception.php';
+ throw new StompException("No broker defined");
+ }
+
+ // force disconnect, if previous established connection exists
+ $this->disconnect();
+
+ $i = $this->_currentHost;
+ $att = 0;
+ $connected = false;
+ $connect_errno = null;
+ $connect_errstr = null;
+
+ while (! $connected && $att ++ < $this->_attempts) {
+ if (isset($this->_params['randomize']) && $this->_params['randomize'] == 'true') {
+ $i = rand(0, count($this->_hosts) - 1);
+ } else {
+ $i = ($i + 1) % count($this->_hosts);
+ }
+ $broker = $this->_hosts[$i];
+ $host = $broker[0];
+ $port = $broker[1];
+ $scheme = $broker[2];
+ if ($port == null) {
+ $port = $this->_defaultPort;
+ }
+ if ($this->_socket != null) {
+ fclose($this->_socket);
+ $this->_socket = null;
+ }
+ $this->_socket = @fsockopen($scheme . '://' . $host, $port, $connect_errno, $connect_errstr, $this->_connect_timeout_seconds);
+ if (!is_resource($this->_socket) && $att >= $this->_attempts && !array_key_exists($i + 1, $this->_hosts)) {
+ require_once 'Stomp/Exception.php';
+ throw new StompException("Could not connect to $host:$port ($att/{$this->_attempts})");
+ } else if (is_resource($this->_socket)) {
+ $connected = true;
+ $this->_currentHost = $i;
+ break;
+ }
+ }
+ if (! $connected) {
+ require_once 'Stomp/Exception.php';
+ throw new StompException("Could not connect to a broker");
+ }
+ }
+ /**
+ * Connect to server
+ *
+ * @param string $username
+ * @param string $password
+ * @return boolean
+ * @throws StompException
+ */
+ public function connect ($username = '', $password = '')
+ {
+ $this->_makeConnection();
+ if ($username != '') {
+ $this->_username = $username;
+ }
+ if ($password != '') {
+ $this->_password = $password;
+ }
+ $headers = array('login' => $this->_username , 'passcode' => $this->_password);
+ if ($this->clientId != null) {
+ $headers["client-id"] = $this->clientId;
+ }
+ $frame = new StompFrame("CONNECT", $headers);
+ $this->_writeFrame($frame);
+ $frame = $this->readFrame();
+ if ($frame instanceof StompFrame && $frame->command == 'CONNECTED') {
+ $this->_sessionId = $frame->headers["session"];
+ return true;
+ } else {
+ require_once 'Stomp/Exception.php';
+ if ($frame instanceof StompFrame) {
+ throw new StompException("Unexpected command: {$frame->command}", 0, $frame->body);
+ } else {
+ throw new StompException("Connection not acknowledged");
+ }
+ }
+ }
+
+ /**
+ * Check if client session has ben established
+ *
+ * @return boolean
+ */
+ public function isConnected ()
+ {
+ return !empty($this->_sessionId) && is_resource($this->_socket);
+ }
+ /**
+ * Current stomp session ID
+ *
+ * @return string
+ */
+ public function getSessionId()
+ {
+ return $this->_sessionId;
+ }
+ /**
+ * Send a message to a destination in the messaging system
+ *
+ * @param string $destination Destination queue
+ * @param string|StompFrame $msg Message
+ * @param array $properties
+ * @param boolean $sync Perform request synchronously
+ * @return boolean
+ */
+ public function send ($destination, $msg, $properties = array(), $sync = null)
+ {
+ if ($msg instanceof StompFrame) {
+ $msg->headers['destination'] = $destination;
+ if (is_array($properties)) $msg->headers = array_merge($msg->headers, $properties);
+ $frame = $msg;
+ } else {
+ $headers = $properties;
+ $headers['destination'] = $destination;
+ $frame = new StompFrame('SEND', $headers, $msg);
+ }
+ $this->_prepareReceipt($frame, $sync);
+ $this->_writeFrame($frame);
+ return $this->_waitForReceipt($frame, $sync);
+ }
+ /**
+ * Prepair frame receipt
+ *
+ * @param StompFrame $frame
+ * @param boolean $sync
+ */
+ protected function _prepareReceipt (StompFrame $frame, $sync)
+ {
+ $receive = $this->sync;
+ if ($sync !== null) {
+ $receive = $sync;
+ }
+ if ($receive == true) {
+ $frame->headers['receipt'] = md5(microtime());
+ }
+ }
+ /**
+ * Wait for receipt
+ *
+ * @param StompFrame $frame
+ * @param boolean $sync
+ * @return boolean
+ * @throws StompException
+ */
+ protected function _waitForReceipt (StompFrame $frame, $sync)
+ {
+
+ $receive = $this->sync;
+ if ($sync !== null) {
+ $receive = $sync;
+ }
+ if ($receive == true) {
+ $id = (isset($frame->headers['receipt'])) ? $frame->headers['receipt'] : null;
+ if ($id == null) {
+ return true;
+ }
+ $frame = $this->readFrame();
+ if ($frame instanceof StompFrame && $frame->command == 'RECEIPT') {
+ if ($frame->headers['receipt-id'] == $id) {
+ return true;
+ } else {
+ require_once 'Stomp/Exception.php';
+ throw new StompException("Unexpected receipt id {$frame->headers['receipt-id']}", 0, $frame->body);
+ }
+ } else {
+ require_once 'Stomp/Exception.php';
+ if ($frame instanceof StompFrame) {
+ throw new StompException("Unexpected command {$frame->command}", 0, $frame->body);
+ } else {
+ throw new StompException("Receipt not received");
+ }
+ }
+ }
+ return true;
+ }
+ /**
+ * Register to listen to a given destination
+ *
+ * @param string $destination Destination queue
+ * @param array $properties
+ * @param boolean $sync Perform request synchronously
+ * @return boolean
+ * @throws StompException
+ */
+ public function subscribe ($destination, $properties = null, $sync = null)
+ {
+ $headers = array('ack' => 'client');
+ $headers['activemq.prefetchSize'] = $this->prefetchSize;
+ if ($this->clientId != null) {
+ $headers["activemq.subcriptionName"] = $this->clientId;
+ }
+ if (isset($properties)) {
+ foreach ($properties as $name => $value) {
+ $headers[$name] = $value;
+ }
+ }
+ $headers['destination'] = $destination;
+ $frame = new StompFrame('SUBSCRIBE', $headers);
+ $this->_prepareReceipt($frame, $sync);
+ $this->_writeFrame($frame);
+ if ($this->_waitForReceipt($frame, $sync) == true) {
+ $this->_subscriptions[$destination] = $properties;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ /**
+ * Remove an existing subscription
+ *
+ * @param string $destination
+ * @param array $properties
+ * @param boolean $sync Perform request synchronously
+ * @return boolean
+ * @throws StompException
+ */
+ public function unsubscribe ($destination, $properties = null, $sync = null)
+ {
+ $headers = array();
+ if (isset($properties)) {
+ foreach ($properties as $name => $value) {
+ $headers[$name] = $value;
+ }
+ }
+ $headers['destination'] = $destination;
+ $frame = new StompFrame('UNSUBSCRIBE', $headers);
+ $this->_prepareReceipt($frame, $sync);
+ $this->_writeFrame($frame);
+ if ($this->_waitForReceipt($frame, $sync) == true) {
+ unset($this->_subscriptions[$destination]);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ /**
+ * Start a transaction
+ *
+ * @param string $transactionId
+ * @param boolean $sync Perform request synchronously
+ * @return boolean
+ * @throws StompException
+ */
+ public function begin ($transactionId = null, $sync = null)
+ {
+ $headers = array();
+ if (isset($transactionId)) {
+ $headers['transaction'] = $transactionId;
+ }
+ $frame = new StompFrame('BEGIN', $headers);
+ $this->_prepareReceipt($frame, $sync);
+ $this->_writeFrame($frame);
+ return $this->_waitForReceipt($frame, $sync);
+ }
+ /**
+ * Commit a transaction in progress
+ *
+ * @param string $transactionId
+ * @param boolean $sync Perform request synchronously
+ * @return boolean
+ * @throws StompException
+ */
+ public function commit ($transactionId = null, $sync = null)
+ {
+ $headers = array();
+ if (isset($transactionId)) {
+ $headers['transaction'] = $transactionId;
+ }
+ $frame = new StompFrame('COMMIT', $headers);
+ $this->_prepareReceipt($frame, $sync);
+ $this->_writeFrame($frame);
+ return $this->_waitForReceipt($frame, $sync);
+ }
+ /**
+ * Roll back a transaction in progress
+ *
+ * @param string $transactionId
+ * @param boolean $sync Perform request synchronously
+ */
+ public function abort ($transactionId = null, $sync = null)
+ {
+ $headers = array();
+ if (isset($transactionId)) {
+ $headers['transaction'] = $transactionId;
+ }
+ $frame = new StompFrame('ABORT', $headers);
+ $this->_prepareReceipt($frame, $sync);
+ $this->_writeFrame($frame);
+ return $this->_waitForReceipt($frame, $sync);
+ }
+ /**
+ * Acknowledge consumption of a message from a subscription
+ * Note: This operation is always asynchronous
+ *
+ * @param string|StompFrame $messageMessage ID
+ * @param string $transactionId
+ * @return boolean
+ * @throws StompException
+ */
+ public function ack ($message, $transactionId = null)
+ {
+ if ($message instanceof StompFrame) {
+ $headers = $message->headers;
+ if (isset($transactionId)) {
+ $headers['transaction'] = $transactionId;
+ }
+ $frame = new StompFrame('ACK', $headers);
+ $this->_writeFrame($frame);
+ return true;
+ } else {
+ $headers = array();
+ if (isset($transactionId)) {
+ $headers['transaction'] = $transactionId;
+ }
+ $headers['message-id'] = $message;
+ $frame = new StompFrame('ACK', $headers);
+ $this->_writeFrame($frame);
+ return true;
+ }
+ }
+ /**
+ * Graceful disconnect from the server
+ *
+ */
+ public function disconnect ()
+ {
+ $headers = array();
+
+ if ($this->clientId != null) {
+ $headers["client-id"] = $this->clientId;
+ }
+
+ if (is_resource($this->_socket)) {
+ $this->_writeFrame(new StompFrame('DISCONNECT', $headers));
+ fclose($this->_socket);
+ }
+ $this->_socket = null;
+ $this->_sessionId = null;
+ $this->_currentHost = -1;
+ $this->_subscriptions = array();
+ $this->_username = '';
+ $this->_password = '';
+ }
+ /**
+ * Write frame to server
+ *
+ * @param StompFrame $stompFrame
+ */
+ protected function _writeFrame (StompFrame $stompFrame)
+ {
+ if (!is_resource($this->_socket)) {
+ require_once 'Stomp/Exception.php';
+ throw new StompException('Socket connection hasn\'t been established');
+ }
+
+ $data = $stompFrame->__toString();
+ $r = fwrite($this->_socket, $data, strlen($data));
+ if ($r === false || $r == 0) {
+ $this->_reconnect();
+ $this->_writeFrame($stompFrame);
+ }
+ }
+
+ /**
+ * Set timeout to wait for content to read
+ *
+ * @param int $seconds_to_wait Seconds to wait for a frame
+ * @param int $milliseconds Milliseconds to wait for a frame
+ */
+ public function setReadTimeout($seconds, $milliseconds = 0)
+ {
+ $this->_read_timeout_seconds = $seconds;
+ $this->_read_timeout_milliseconds = $milliseconds;
+ }
+
+ /**
+ * Read response frame from server
+ *
+ * @return StompFrame False when no frame to read
+ */
+ public function readFrame ()
+ {
+ if (!$this->hasFrameToRead()) {
+ return false;
+ }
+
+ $rb = 1024;
+ $data = '';
+ $end = false;
+
+ do {
+ $read = fread($this->_socket, $rb);
+ if ($read === false) {
+ $this->_reconnect();
+ return $this->readFrame();
+ }
+ $data .= $read;
+ if (strpos($data, "\x00") !== false) {
+ $end = true;
+ $data = rtrim($data, "\n");
+ }
+ $len = strlen($data);
+ } while ($len < 2 || $end == false);
+
+ list ($header, $body) = explode("\n\n", $data, 2);
+ $header = explode("\n", $header);
+ $headers = array();
+ $command = null;
+ foreach ($header as $v) {
+ if (isset($command)) {
+ list ($name, $value) = explode(':', $v, 2);
+ $headers[$name] = $value;
+ } else {
+ $command = $v;
+ }
+ }
+ $frame = new StompFrame($command, $headers, trim($body));
+ if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') {
+ require_once 'Stomp/Message/Map.php';
+ return new StompMessageMap($frame);
+ } else {
+ return $frame;
+ }
+ return $frame;
+ }
+
+ /**
+ * Check if there is a frame to read
+ *
+ * @return boolean
+ */
+ public function hasFrameToRead()
+ {
+ $read = array($this->_socket);
+ $write = null;
+ $except = null;
+
+ $has_frame_to_read = @stream_select($read, $write, $except, $this->_read_timeout_seconds, $this->_read_timeout_milliseconds);
+
+ if ($has_frame_to_read !== false)
+ $has_frame_to_read = count($read);
+
+
+ if ($has_frame_to_read === false) {
+ throw new StompException('Check failed to determine if the socket is readable');
+ } else if ($has_frame_to_read > 0) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Reconnects and renews subscriptions (if there were any)
+ * Call this method when you detect connection problems
+ */
+ protected function _reconnect ()
+ {
+ $subscriptions = $this->_subscriptions;
+
+ $this->connect($this->_username, $this->_password);
+ foreach ($subscriptions as $dest => $properties) {
+ $this->subscribe($dest, $properties);
+ }
+ }
+ /**
+ * Graceful object desruction
+ *
+ */
+ public function __destruct()
+ {
+ $this->disconnect();
+ }
+}
+?>
55 src/main/Stomp/Exception.php
@@ -0,0 +1,55 @@
+<?php
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* vim: set expandtab tabstop=3 shiftwidth=3: */
+
+/**
+ * A Stomp Connection
+ *
+ *
+ * @package Stomp
+ */
+class StompException extends Exception
+{
+ protected $_details;
+
+ /**
+ * Constructor
+ *
+ * @param string $message Error message
+ * @param int $code Error code
+ * @param string $details Stomp server error details
+ */
+ public function __construct($message = null, $code = 0, $details = '')
+ {
+ $this->_details = $details;
+
+ parent::__construct($message, $code);
+ }
+
+ /**
+ * Stomp server error details
+ *
+ * @return string
+ */
+ public function getDetails()
+ {
+ return $this->_details;
+ }
+}
+?>
76 src/main/Stomp/Frame.php
@@ -0,0 +1,76 @@
+<?php
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* vim: set expandtab tabstop=3 shiftwidth=3: */
+
+/**
+ * Stomp Frames are messages that are sent and received on a stomp connection.
+ *
+ * @package Stomp
+ */
+class StompFrame
+{
+ public $command;
+ public $headers = array();
+ public $body;
+
+ /**
+ * Constructor
+ *
+ * @param string $command
+ * @param array $headers
+ * @param string $body
+ */
+ public function __construct ($command = null, $headers = null, $body = null)
+ {
+ $this->_init($command, $headers, $body);
+ }
+
+ protected function _init ($command = null, $headers = null, $body = null)
+ {
+ $this->command = $command;
+ if ($headers != null) {
+ $this->headers = $headers;
+ }
+ $this->body = $body;
+
+ if ($this->command == 'ERROR') {
+ require_once 'Exception.php';
+ throw new StompException($this->headers['message'], 0, $this->body);
+ }
+ }
+
+ /**
+ * Convert frame to transportable string
+ *
+ * @return string
+ */
+ public function __toString()
+ {
+ $data = $this->command . "\n";
+
+ foreach ($this->headers as $name => $value) {
+ $data .= $name . ": " . $value . "\n";
+ }
+
+ $data .= "\n";
+ $data .= $this->body;
+ return $data .= "\x00";
+ }
+}
+?>
35 src/main/Stomp/Message.php
@@ -0,0 +1,35 @@
+<?php
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* vim: set expandtab tabstop=3 shiftwidth=3: */
+
+require_once 'Stomp/Frame.php';
+
+/**
+ * Basic text stomp message
+ *
+ * @package Stomp
+ */
+class StompMessage extends StompFrame
+{
+ public function __construct ($body, $headers = null)
+ {
+ $this->_init("SEND", $headers, $body);
+ }
+}
+?>
45 src/main/Stomp/Message/Bytes.php
@@ -0,0 +1,45 @@
+<?php
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* vim: set expandtab tabstop=3 shiftwidth=3: */
+
+require_once 'Stomp/Message.php';
+
+/**
+ * Message that contains a stream of uninterpreted bytes
+ *
+ * @package Stomp
+ */
+class StompMessageBytes extends StompMessage
+{
+ /**
+ * Constructor
+ *
+ * @param string $body
+ * @param array $headers
+ */
+ function __construct ($body, $headers = null)
+ {
+ $this->_init("SEND", $headers, $body);
+ if ($this->headers == null) {
+ $this->headers = array();
+ }
+ $this->headers['content-length'] = count(unpack("c*", $body));
+ }
+}
+?>
53 src/main/Stomp/Message/Map.php
@@ -0,0 +1,53 @@
+<?php
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* vim: set expandtab tabstop=3 shiftwidth=3: */
+
+require_once 'Stomp/Message.php';
+
+/**
+ * Message that contains a set of name-value pairs
+ *
+ * @package Stomp
+ */
+class StompMessageMap extends StompMessage
+{
+ public $map;
+
+ /**
+ * Constructor
+ *
+ * @param StompFrame|string $msg
+ * @param array $headers
+ */
+ function __construct ($msg, $headers = null)
+ {
+ if ($msg instanceof StompFrame) {
+ $this->_init($msg->command, $msg->headers, $msg->body);
+ $this->map = json_decode($msg->body, true);
+ } else {
+ $this->_init("SEND", $headers, $msg);
+ if ($this->headers == null) {
+ $this->headers = array();
+ }
+ $this->headers['transformation'] = 'jms-map-json';
+ $this->body = json_encode($msg);
+ }
+ }
+}
+?>
62 src/test/StompFailoverTest.php
@@ -0,0 +1,62 @@
+<?php
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/* vim: set expandtab tabstop=3 shiftwidth=3: */
+require_once '../main/Stomp.php';
+require_once 'PHPUnit/Framework/TestCase.php';
+/**
+ * Stomp test case.
+ *
+ * @package Stomp
+ * @author Michael Caplan <mcaplan@labnet.net>
+ * @version $Revision: 35 $
+ */
+class StompFailoverTest extends PHPUnit_Framework_TestCase
+{
+ /**
+ * @var Stomp
+ */
+ private $Stomp;
+ /**
+ * Prepares the environment before running a test.
+ */
+ protected function setUp ()
+ {
+ parent::setUp();
+
+ $stomp_path = realpath('../../main/php5/');
+ set_include_path(get_include_path() . PATH_SEPARATOR . $stomp_path);
+
+ $this->Stomp = new Stomp('failover://(tcp://localhost:61614,tcp://localhost:61613)?randomize=false');
+ }
+ /**
+ * Cleans up the environment after running a test.
+ */
+ protected function tearDown ()
+ {
+ $this->Stomp = null;
+ parent::tearDown();
+ }
+ /**
+ * Tests Stomp->connect()
+ */
+ public function testFailoverConnect ()
+ {
+ $this->assertTrue($this->Stomp->connect());
+ }
+}
+
245 src/test/StompSslTest.php
@@ -0,0 +1,245 @@
+<?php
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/* vim: set expandtab tabstop=3 shiftwidth=3: */
+require_once '../main/Stomp.php';
+require_once 'PHPUnit/Framework/TestCase.php';
+/**
+ * Stomp test case.
+ * @package Stomp
+ * @author Michael Caplan <mcaplan@labnet.net>
+ * @version $Revision: 38 $
+ */
+class StompSslTest extends PHPUnit_Framework_TestCase
+{
+ /**
+ * @var Stomp
+ */
+ private $Stomp;
+ private $broker = 'ssl://localhost:61612';
+ private $queue = '/queue/test';
+ /**
+ * Prepares the environment before running a test.
+ */
+ protected function setUp ()
+ {
+ parent::setUp();
+
+ $stomp_path = realpath('../../main/php5/');
+ set_include_path(get_include_path() . PATH_SEPARATOR . $stomp_path);
+
+ $this->Stomp = new Stomp($this->broker);
+ $this->Stomp->sync = false;
+ }
+ /**
+ * Cleans up the environment after running a test.
+ */
+ protected function tearDown ()
+ {
+ $this->Stomp = null;
+ parent::tearDown();
+ }
+ /**
+ * Tests Stomp->abort()
+ */
+ public function testAbort ()
+ {
+ // TODO Auto-generated StompTest->testAbort()
+ $this->markTestIncomplete("abort test not implemented");
+ }
+ /**
+ * Tests Stomp->hasFrameToRead()
+ *
+ */
+ public function testHasFrameToRead()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+
+ $this->Stomp->setReadTimeout(5);
+
+ $this->assertFalse($this->Stomp->hasFrameToRead(), 'Has frame to read when non expected');
+
+ $this->Stomp->send($this->queue, 'testHasFrameToRead');
+
+ $this->Stomp->subscribe($this->queue, array('ack' => 'client','activemq.prefetchSize' => 1 ));
+
+ $this->assertTrue($this->Stomp->hasFrameToRead(), 'Did not have frame to read when expected');
+
+ $frame = $this->Stomp->readFrame();
+
+ $this->assertTrue($frame instanceof StompFrame, 'Frame expected');
+
+ $this->Stomp->ack($frame);
+
+ $this->Stomp->disconnect();
+
+ $this->Stomp->setReadTimeout(60);
+ }
+ /**
+ * Tests Stomp->ack()
+ */
+ public function testAck ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+
+ $messages = array();
+
+ for ($x = 0; $x < 100; ++$x) {
+ $this->Stomp->send($this->queue, $x);
+ $messages[$x] = 'sent';
+ }
+
+ $this->Stomp->disconnect();
+
+ for ($y = 0; $y < 100; $y += 10) {
+
+ $this->Stomp->connect();
+
+ $this->Stomp->subscribe($this->queue, array('ack' => 'client','activemq.prefetchSize' => 1 ));
+
+ for ($x = $y; $x < $y + 10; ++$x) {
+ $frame = $this->Stomp->readFrame();
+ $this->assertTrue($frame instanceof StompFrame);
+ $this->assertArrayHasKey($frame->body, $messages, $frame->body . ' is not in the list of messages to ack');
+ $this->assertEquals('sent', $messages[$frame->body], $frame->body . ' has been marked acked, but has been received again.');
+ $messages[$frame->body] = 'acked';
+
+ $this->assertTrue($this->Stomp->ack($frame), "Unable to ack {$frame->headers['message-id']}");
+
+ }
+
+ $this->Stomp->disconnect();
+
+ }
+ }
+ /**
+ * Tests Stomp->begin()
+ */
+ public function testBegin ()
+ {
+ // TODO Auto-generated StompTest->testBegin()
+ $this->markTestIncomplete("begin test not implemented");
+ $this->Stomp->begin(/* parameters */);
+ }
+ /**
+ * Tests Stomp->commit()
+ */
+ public function testCommit ()
+ {
+ // TODO Auto-generated StompTest->testCommit()
+ $this->markTestIncomplete("commit test not implemented");
+ $this->Stomp->commit(/* parameters */);
+ }
+ /**
+ * Tests Stomp->connect()
+ */
+ public function testConnect ()
+ {
+ $this->assertTrue($this->Stomp->connect());
+ $this->assertTrue($this->Stomp->isConnected());
+ }
+ /**
+ * Tests Stomp->disconnect()
+ */
+ public function testDisconnect ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $this->assertTrue($this->Stomp->isConnected());
+ $this->Stomp->disconnect();
+ $this->assertFalse($this->Stomp->isConnected());
+ }
+ /**
+ * Tests Stomp->getSessionId()
+ */
+ public function testGetSessionId ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $this->assertNotNull($this->Stomp->getSessionId());
+ }
+ /**
+ * Tests Stomp->isConnected()
+ */
+ public function testIsConnected ()
+ {
+ $this->Stomp->connect();
+ $this->assertTrue($this->Stomp->isConnected());
+ $this->Stomp->disconnect();
+ $this->assertFalse($this->Stomp->isConnected());
+ }
+ /**
+ * Tests Stomp->readFrame()
+ */
+ public function testReadFrame ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $this->Stomp->send($this->queue, 'testReadFrame');
+ $this->Stomp->subscribe($this->queue);
+ $frame = $this->Stomp->readFrame();
+ $this->assertTrue($frame instanceof StompFrame);
+ $this->assertEquals('testReadFrame', $frame->body, 'Body of test frame does not match sent message');
+ $this->Stomp->ack($frame);
+ $this->Stomp->unsubscribe($this->queue);
+ }
+ /**
+ * Tests Stomp->send()
+ */
+ public function testSend ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $this->assertTrue($this->Stomp->send($this->queue, 'testSend'));
+ $this->Stomp->subscribe($this->queue);
+ $frame = $this->Stomp->readFrame();
+ $this->assertEquals('testSend', $frame->body, 'Body of test frame does not match sent message');
+ $this->Stomp->ack($frame);
+ $this->Stomp->unsubscribe($this->queue);
+ }
+ /**
+ * Tests Stomp->subscribe()
+ */
+ public function testSubscribe ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $this->assertTrue($this->Stomp->subscribe($this->queue));
+ $this->Stomp->unsubscribe($this->queue);
+ }
+ /**
+ * Tests Stomp->unsubscribe()
+ */
+ public function testUnsubscribe ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $this->Stomp->subscribe($this->queue);
+ $this->assertTrue($this->Stomp->unsubscribe($this->queue));
+ }
+}
+
50 src/test/StompSuite.php
@@ -0,0 +1,50 @@
+<?php
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/* vim: set expandtab tabstop=3 shiftwidth=3: */
+require_once 'PHPUnit/Framework/TestSuite.php';
+require_once 'StompFailoverTest.php';
+require_once 'StompTest.php';
+require_once 'StompSslTest.php';
+/**
+ * Static test suite.
+ *
+ * @package Stomp
+ * @author Michael Caplan <mcaplan@labnet.net>
+ * @version $Revision: 23 $
+ */
+class StompSuite extends PHPUnit_Framework_TestSuite
+{
+ /**
+ * Constructs the test suite handler.
+ */
+ public function __construct ()
+ {
+ $this->setName('StompSuite');
+ $this->addTestSuite('StompFailoverTest');
+ $this->addTestSuite('StompTest');
+ $this->addTestSuite('StompSslTest');
+ }
+ /**
+ * Creates the suite.
+ */
+ public static function suite ()
+ {
+ return new self();
+ }
+}
+
340 src/test/StompTest.php
@@ -0,0 +1,340 @@
+<?php
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/* vim: set expandtab tabstop=3 shiftwidth=3: */
+require_once '../main/Stomp.php';
+require_once '../main/Stomp/Message/Map.php';
+require_once '../main/Stomp/Message/Bytes.php';
+require_once 'PHPUnit/Framework/TestCase.php';
+/**
+ * Stomp test case.
+ * @package Stomp
+ * @author Michael Caplan <mcaplan@labnet.net>
+ * @author Dejan Bosanac <dejan@nighttale.net>
+ * @version $Revision: 40 $
+ */
+class StompTest extends PHPUnit_Framework_TestCase
+{
+ /**
+ * @var Stomp
+ */
+ private $Stomp;
+ private $broker = 'tcp://localhost:61613';
+ private $queue = '/queue/test';
+ private $topic = '/topic/test';
+ /**
+ * Prepares the environment before running a test.
+ */
+ protected function setUp ()
+ {
+ parent::setUp();
+
+ $stomp_path = realpath('../main/');
+ set_include_path(get_include_path() . PATH_SEPARATOR . $stomp_path);
+
+ $this->Stomp = new Stomp($this->broker);
+ $this->Stomp->sync = false;
+ }
+ /**
+ * Cleans up the environment after running a test.
+ */
+ protected function tearDown ()
+ {
+ $this->Stomp = null;
+ parent::tearDown();
+ }
+
+ /**
+ * Tests Stomp->hasFrameToRead()
+ *
+ */
+ public function testHasFrameToRead()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+
+ $this->Stomp->setReadTimeout(5);
+
+ $this->assertFalse($this->Stomp->hasFrameToRead(), 'Has frame to read when non expected');
+
+ $this->Stomp->send($this->queue, 'testHasFrameToRead');
+
+ $this->Stomp->subscribe($this->queue, array('ack' => 'client','activemq.prefetchSize' => 1 ));
+
+ $this->assertTrue($this->Stomp->hasFrameToRead(), 'Did not have frame to read when expected');
+
+ $frame = $this->Stomp->readFrame();
+
+ $this->assertTrue($frame instanceof StompFrame, 'Frame expected');
+
+ $this->Stomp->ack($frame);
+
+ $this->Stomp->disconnect();
+
+ $this->Stomp->setReadTimeout(60);
+ }
+ /**
+ * Tests Stomp->ack()
+ */
+ public function testAck ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+
+ $messages = array();
+
+ for ($x = 0; $x < 100; ++$x) {
+ $this->Stomp->send($this->queue, $x);
+ $messages[$x] = 'sent';
+ }
+
+ $this->Stomp->disconnect();
+
+ for ($y = 0; $y < 100; $y += 10) {
+
+ $this->Stomp->connect();
+
+ $this->Stomp->subscribe($this->queue, array('ack' => 'client','activemq.prefetchSize' => 1 ));
+
+ for ($x = $y; $x < $y + 10; ++$x) {
+ $frame = $this->Stomp->readFrame();
+ $this->assertTrue($frame instanceof StompFrame);
+ $this->assertArrayHasKey($frame->body, $messages, $frame->body . ' is not in the list of messages to ack');
+ $this->assertEquals('sent', $messages[$frame->body], $frame->body . ' has been marked acked, but has been received again.');
+ $messages[$frame->body] = 'acked';
+
+ $this->assertTrue($this->Stomp->ack($frame), "Unable to ack {$frame->headers['message-id']}");
+
+ }
+
+ $this->Stomp->disconnect();
+
+ }
+
+ $un_acked_messages = array();
+
+ foreach ($messages as $key => $value) {
+ if ($value == 'sent') {
+ $un_acked_messages[] = $key;
+ }
+ }
+
+ $this->assertEquals(0, count($un_acked_messages), 'Remaining messages to ack' . var_export($un_acked_messages, true));
+ }
+ /**
+ * Tests Stomp->abort()
+ */
+ public function testAbort()
+ {
+ $this->Stomp->setReadTimeout(1);
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $this->Stomp->begin("tx1");
+ $this->assertTrue($this->Stomp->send($this->queue, 'testSend', array("transaction" => "tx1")));
+ $this->Stomp->abort("tx1");
+
+ $this->Stomp->subscribe($this->queue);
+ $frame = $this->Stomp->readFrame();
+ $this->assertFalse($frame);
+ $this->Stomp->unsubscribe($this->queue);
+ $this->Stomp->disconnect();
+ }
+
+ /**
+ * Tests Stomp->connect()
+ */
+ public function testConnect ()
+ {
+ $this->assertTrue($this->Stomp->connect());
+ $this->assertTrue($this->Stomp->isConnected());
+ }
+ /**
+ * Tests Stomp->disconnect()
+ */
+ public function testDisconnect ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $this->assertTrue($this->Stomp->isConnected());
+ $this->Stomp->disconnect();
+ $this->assertFalse($this->Stomp->isConnected());
+ }
+ /**
+ * Tests Stomp->getSessionId()
+ */
+ public function testGetSessionId ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $this->assertNotNull($this->Stomp->getSessionId());
+ }
+ /**
+ * Tests Stomp->isConnected()
+ */
+ public function testIsConnected ()
+ {
+ $this->Stomp->connect();
+ $this->assertTrue($this->Stomp->isConnected());
+ $this->Stomp->disconnect();
+ $this->assertFalse($this->Stomp->isConnected());
+ }
+ /**
+ * Tests Stomp->readFrame()
+ */
+ public function testReadFrame ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $this->Stomp->send($this->queue, 'testReadFrame');
+ $this->Stomp->subscribe($this->queue);
+ $frame = $this->Stomp->readFrame();
+ $this->assertTrue($frame instanceof StompFrame);
+ $this->assertEquals('testReadFrame', $frame->body, 'Body of test frame does not match sent message');
+ $this->Stomp->ack($frame);
+ $this->Stomp->unsubscribe($this->queue);
+ }
+ /**
+ * Tests Stomp->send()
+ */
+ public function testSend ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $this->assertTrue($this->Stomp->send($this->queue, 'testSend'));
+ $this->Stomp->subscribe($this->queue);
+ $frame = $this->Stomp->readFrame();
+ $this->assertTrue($frame instanceof StompFrame);
+ $this->assertEquals('testSend', $frame->body, 'Body of test frame does not match sent message');
+ $this->Stomp->ack($frame);
+ $this->Stomp->unsubscribe($this->queue);
+ }
+ /**
+ * Tests Stomp->subscribe()
+ */
+ public function testSubscribe ()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $this->assertTrue($this->Stomp->subscribe($this->queue));
+ $this->Stomp->unsubscribe($this->queue);
+ }
+
+ /**
+ * Tests Stomp message transformation - json map
+ */
+ public function testJsonMapTransformation()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $body = array("city"=>"Belgrade", "name"=>"Dejan");
+ $header = array();
+ $header['transformation'] = 'jms-map-json';
+ $mapMessage = new StompMessageMap($body, $header);
+ $this->Stomp->send($this->queue, $mapMessage);
+
+ $this->Stomp->subscribe($this->queue, array('transformation' => 'jms-map-json'));
+ $msg = $this->Stomp->readFrame();
+ $this->assertTrue($msg instanceOf StompMessageMap);
+ $this->assertEquals($msg->map, $body);
+ $this->Stomp->ack($msg);
+ $this->Stomp->disconnect();
+ }
+
+ /**
+ * Tests Stomp byte messages
+ */
+ public function testByteMessages()
+ {
+ if (! $this->Stomp->isConnected()) {
+ $this->Stomp->connect();
+ }
+ $body = "test";
+ $mapMessage = new StompMessageBytes($body);
+ $this->Stomp->send($this->queue, $mapMessage);
+
+ $this->Stomp->subscribe($this->queue);
+ $msg = $this->Stomp->readFrame();
+ $this->assertEquals($msg->body, $body);
+ $this->Stomp->ack($msg);
+ $this->Stomp->disconnect();
+ }
+
+ /**
+ * Tests Stomp->unsubscribe()
+ */