Skip to content

Commit

Permalink
Fixed hardcodings in the phing remote deploy script
Browse files Browse the repository at this point in the history
Set  the ackBuffer  to  0 in  multi-web  demo to  avoid deadlock  with
prefetch-count

Fixes in Channel when working with PChannels

Removed debug logging code
  • Loading branch information
BraveSirRobin committed Jan 10, 2012
1 parent e4bffb0 commit 3828927
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 24 deletions.
4 changes: 2 additions & 2 deletions build.properties
Expand Up @@ -30,10 +30,10 @@ gencode.stripWhitespace=true


# Deploy to web server params
webdepl.basedir=/var/www
webdepl.basedir=/var/amq.phpvm

webdepl.libdir=amqphp-libs

webdepl.viewdir=amqphp-view

webdepl.confdir=amqphp-config
webdepl.confdir=amqphp-config
12 changes: 8 additions & 4 deletions build.xml
Expand Up @@ -175,16 +175,20 @@ Prepare the NSPF and CPF source trees.
<exec command="ssh phpvm mkdir -p ${webdepl.basedir}/${webdepl.viewdir} ${webdepl.basedir}/${webdepl.libdir} ${webdepl.basedir}/${webdepl.confdir}" />

<!-- controllers -->
<exec command="scp demos/pconnections/web-single-process-test.php demos/pconnections/web-multi-process-test.php phpvm:/var/www" dir="${project.basedir}"/>
<exec command="scp demos/pconnections/web-single-process-test.php demos/pconnections/web-multi-process-test.php phpvm:${webdepl.basedir}"
dir="${project.basedir}"/>

<!-- views -->
<exec command="scp demos/pconnections/web-controls-multi.phtml demos/pconnections/web-controls-single.phtml phpvm:${webdepl.basedir}/${webdepl.viewdir}" dir="${project.basedir}"/>
<exec command="scp demos/pconnections/web-controls-multi.phtml demos/pconnections/web-controls-single.phtml phpvm:${webdepl.basedir}/${webdepl.viewdir}"
dir="${project.basedir}"/>

<!-- libs -->
<exec command="scp demos/Setup.php demos/pconnections/web-common.php demos/class-loader.php phpvm:${webdepl.basedir}/${webdepl.libdir}" dir="${project.basedir}"/>
<exec command="scp demos/Setup.php demos/pconnections/web-common.php demos/class-loader.php phpvm:${webdepl.basedir}/${webdepl.libdir}"
dir="${project.basedir}"/>

<!-- configs -->
<exec command="scp demos/configs/web-multi.xml demos/configs/broker-common-setup.xml phpvm:${webdepl.basedir}/${webdepl.confdir}" dir="${project.basedir}"/>
<exec command="scp demos/configs/web-multi.xml demos/configs/broker-common-setup.xml phpvm:${webdepl.basedir}/${webdepl.confdir}"
dir="${project.basedir}"/>
</target>

<target name="deploy-web-libs-only">
Expand Down
2 changes: 2 additions & 0 deletions demos/configs/web-multi.xml
Expand Up @@ -21,6 +21,7 @@
<set_properties>
<suspendOnSerialize k="boolean">true</suspendOnSerialize>
<resumeOnHydrate k="boolean">true</resumeOnHydrate>
<ackBuffer k="integer">0</ackBuffer>
</set_properties>
<consumer>
<impl>DemoPConsumer</impl>
Expand Down Expand Up @@ -69,6 +70,7 @@
<set_properties>
<suspendOnSerialize k="boolean">true</suspendOnSerialize>
<resumeOnHydrate k="boolean">true</resumeOnHydrate>
<ackBuffer k="integer">0</ackBuffer>
</set_properties>
<consumer>
<impl>DemoPConsumer</impl>
Expand Down
16 changes: 8 additions & 8 deletions src/amqphp/Channel.php
Expand Up @@ -106,12 +106,12 @@ class Channel
* receive a message on one channel and then acknowledge it on
* another.</rule>
*/
private $pendingAcks = array();
private $numPendAcks = 0; // Avoid re-counting lots.
protected $pendingAcks = array();
protected $numPendAcks = 0; // Avoid re-counting lots.

/** Used to track ack response, one of CONSUMER_ACK,
* CONSUMER_REJECT, CONSUMER_DROP */
private $ackFlag;
protected $ackFlag;


/**
Expand Down Expand Up @@ -350,13 +350,13 @@ private function deliverConsumerMessage ($meth) {
foreach ($response as $resp) {
switch ($resp) {
case CONSUMER_ACK:
if (! $consParams['no-ack']) {
if (! array_key_exists('no-ack', $consParams) || ! $consParams['no-ack']) {
$this->ack($meth, CONSUMER_ACK);
}
break;
case CONSUMER_DROP:
case CONSUMER_REJECT:
if (! $consParams['no-ack']) {
if (! array_key_exists('no-ack', $consParams) || ! $consParams['no-ack']) {
$this->ack($meth, $resp);
}
break;
Expand Down Expand Up @@ -402,16 +402,16 @@ private function flushAcks () {
}
switch ($this->ackFlag) {
case CONSUMER_ACK:
printf(" (amqp\Channel) - flush acks for messages %s\n", implode(',', $this->pendingAcks));
//printf(" (amqp\Channel) - flush acks for messages %s\n", implode(',', $this->pendingAcks));
$ack = $this->basic('ack', array('delivery-tag' => array_pop($this->pendingAcks),
'multiple' => true));
$this->invoke($ack);
break;
case CONSUMER_REJECT:
case CONSUMER_DROP:
printf(" (amqp\Channel) - flush %s for messages %s\n",
/*printf(" (amqp\Channel) - flush %s for messages %s\n",
($this->ackFlag == CONSUMER_REJECT) ? 'rejects' : 'drops',
implode(',', $this->pendingAcks));
implode(',', $this->pendingAcks));*/

$rej = $this->basic('nack', array('delivery-tag' => array_pop($this->pendingAcks),
'multiple' => true,
Expand Down
2 changes: 0 additions & 2 deletions src/amqphp/Connection.php
Expand Up @@ -577,8 +577,6 @@ function doSelectRead () {
} else if ($buff === '') {
$this->blocking = false;
throw new \Exception("Empty read in blocking select loop, socket error:\n" . $this->sock->strError(), 9864);
} else {
printf(" (amqp\Connection) doSelectRead got no methods from string [length=%d]:\n%s\n", strlen($buff), wire\Hexdump::hexdump($buff));
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/amqphp/persistent/PChannel.php
Expand Up @@ -45,7 +45,8 @@ class PChannel extends \amqphp\Channel implements \Serializable
private static $PersProps = array('chanId', 'flow', 'frameMax', 'confirmSeqs',
'confirmSeq', 'confirmMode', 'isOpen',
'callbackHandler', 'suspendOnSerialize',
'resumeOnHydrate');
'resumeOnHydrate', 'ackBuffer', 'pendingAcks',
'numPendAcks', 'ackFlag');

function serialize () {
$data = array();
Expand All @@ -70,7 +71,7 @@ function unserialize ($data) {
$this->$p = $data[$p];
}
foreach ($data['consumers'] as $i => $c) {
$this->consumers[$i] = array($c[0], $c[1], $c[2]);
$this->consumers[$i] = array($c[0], $c[1], $c[2], $c[3]);
}
}

Expand Down
12 changes: 6 additions & 6 deletions tests/forker.xml
Expand Up @@ -16,13 +16,13 @@
<saveMessagesDir></saveMessagesDir><!-- Set to empty for no message saving -->

<!-- Test consumer settings -->
<numConsumers>4</numConsumers>
<numProducers>20</numProducers>
<numConsumers>3</numConsumers>
<numProducers>6</numProducers>
<consumerClass>ForkerConsumer</consumerClass>
<producerClass>ForkerProducer</producerClass>
<consumerChannels>1</consumerChannels>
<consumerPrefetch>0</consumerPrefetch>
<consumersPerChannel>1</consumersPerChannel>
<consumerChannels>2</consumerChannels>
<consumerPrefetch>10</consumerPrefetch>
<consumersPerChannel>4</consumersPerChannel>

<!-- Test producer settings. Message bounds are in bytes. -->
<prodSmallMsgPercent>20</prodSmallMsgPercent>
Expand All @@ -31,7 +31,7 @@
<largeMsgMin>800</largeMsgMin>
<largeMsgMax>800</largeMsgMax>
<prodNumLoops>0</prodNumLoops><!-- Set to zero for open-ended run. -->
<prodSleepMillis>8</prodSleepMillis>
<prodSleepMillis>16</prodSleepMillis>
<prodForceRead>1</prodForceRead>


Expand Down

0 comments on commit 3828927

Please sign in to comment.