Skip to content

Commit

Permalink
Update v8 commands with latest changes from AMQ trunk.
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/activemq/activemq-cpp/trunk@1142339 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Timothy A. Bish committed Jul 2, 2011
1 parent 65636db commit f4cd155
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 1 deletion.
34 changes: 33 additions & 1 deletion activemq-cpp/src/main/activemq/commands/ConnectionControl.cpp
Expand Up @@ -40,7 +40,7 @@ using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
ConnectionControl::ConnectionControl()
: BaseCommand(), close(false), exit(false), faultTolerant(false), resume(false), suspend(false), connectedBrokers(""), reconnectTo(""),
rebalanceConnection(false) {
rebalanceConnection(false), token() {

}

Expand Down Expand Up @@ -85,6 +85,7 @@ void ConnectionControl::copyDataStructure( const DataStructure* src ) {
this->setConnectedBrokers( srcPtr->getConnectedBrokers() );
this->setReconnectTo( srcPtr->getReconnectTo() );
this->setRebalanceConnection( srcPtr->isRebalanceConnection() );
this->setToken( srcPtr->getToken() );
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -116,6 +117,17 @@ std::string ConnectionControl::toString() const {
stream << "ReconnectTo = " << this->getReconnectTo();
stream << ", ";
stream << "RebalanceConnection = " << this->isRebalanceConnection();
stream << ", ";
stream << "Token = ";
if( this->getToken().size() > 0 ) {
stream << "[";
for( size_t itoken = 0; itoken < this->getToken().size(); ++itoken ) {
stream << this->getToken()[itoken] << ",";
}
stream << "]";
} else {
stream << "NULL";
}
stream << " }";

return stream.str();
Expand Down Expand Up @@ -158,6 +170,11 @@ bool ConnectionControl::equals( const DataStructure* value ) const {
if( this->isRebalanceConnection() != valuePtr->isRebalanceConnection() ) {
return false;
}
for( size_t itoken = 0; itoken < this->getToken().size(); ++itoken ) {
if( this->getToken()[itoken] != valuePtr->getToken()[itoken] ) {
return false;
}
}
if( !BaseCommand::equals( value ) ) {
return false;
}
Expand Down Expand Up @@ -254,6 +271,21 @@ void ConnectionControl::setRebalanceConnection( bool rebalanceConnection ) {
this->rebalanceConnection = rebalanceConnection;
}

////////////////////////////////////////////////////////////////////////////////
const std::vector<unsigned char>& ConnectionControl::getToken() const {
return token;
}

////////////////////////////////////////////////////////////////////////////////
std::vector<unsigned char>& ConnectionControl::getToken() {
return token;
}

////////////////////////////////////////////////////////////////////////////////
void ConnectionControl::setToken( const std::vector<unsigned char>& token ) {
this->token = token;
}

////////////////////////////////////////////////////////////////////////////////
decaf::lang::Pointer<commands::Command> ConnectionControl::visit( activemq::state::CommandVisitor* visitor ) {

Expand Down
5 changes: 5 additions & 0 deletions activemq-cpp/src/main/activemq/commands/ConnectionControl.h
Expand Up @@ -54,6 +54,7 @@ namespace commands{
std::string connectedBrokers;
std::string reconnectTo;
bool rebalanceConnection;
std::vector<unsigned char> token;

public:

Expand Down Expand Up @@ -106,6 +107,10 @@ namespace commands{
virtual bool isRebalanceConnection() const;
virtual void setRebalanceConnection( bool rebalanceConnection );

virtual const std::vector<unsigned char>& getToken() const;
virtual std::vector<unsigned char>& getToken();
virtual void setToken( const std::vector<unsigned char>& token );

/**
* @return an answer of true to the isConnectionControl() query.
*/
Expand Down
Expand Up @@ -76,6 +76,9 @@ void ConnectionControlMarshaller::tightUnmarshal( OpenWireFormat* wireFormat, Da
if( wireVersion >= 6 ) {
info->setRebalanceConnection( bs->readBoolean() );
}
if( wireVersion >= 8 ) {
info->setToken( tightUnmarshalByteArray( dataIn, bs ) );
}
}
AMQ_CATCH_RETHROW( decaf::io::IOException )
AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, decaf::io::IOException )
Expand Down Expand Up @@ -108,6 +111,10 @@ int ConnectionControlMarshaller::tightMarshal1( OpenWireFormat* wireFormat, Data
if( wireVersion >= 6 ) {
bs->writeBoolean( info->isRebalanceConnection() );
}
if( wireVersion >= 8 ) {
bs->writeBoolean( info->getToken().size() != 0 );
rc += info->getToken().size() == 0 ? 0 : (int)info->getToken().size() + 4;
}

return rc + 0;
}
Expand Down Expand Up @@ -142,6 +149,12 @@ void ConnectionControlMarshaller::tightMarshal2( OpenWireFormat* wireFormat, Dat
if( wireVersion >= 6 ) {
bs->readBoolean();
}
if( wireVersion >= 8 ) {
if( bs->readBoolean() ) {
dataOut->writeInt( (int)info->getToken().size() );
dataOut->write( (const unsigned char*)(&info->getToken()[0]), (int)info->getToken().size(), 0, (int)info->getToken().size() );
}
}
}
AMQ_CATCH_RETHROW( decaf::io::IOException )
AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, decaf::io::IOException )
Expand Down Expand Up @@ -173,6 +186,9 @@ void ConnectionControlMarshaller::looseUnmarshal( OpenWireFormat* wireFormat, Da
if( wireVersion >= 6 ) {
info->setRebalanceConnection( dataIn->readBoolean() );
}
if( wireVersion >= 8 ) {
info->setToken( looseUnmarshalByteArray( dataIn ) );
}
}
AMQ_CATCH_RETHROW( decaf::io::IOException )
AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, decaf::io::IOException )
Expand Down Expand Up @@ -204,6 +220,13 @@ void ConnectionControlMarshaller::looseMarshal( OpenWireFormat* wireFormat, Data
if( wireVersion >= 6 ) {
dataOut->writeBoolean( info->isRebalanceConnection() );
}
if( wireVersion >= 8 ) {
dataOut->write( info->getToken().size() != 0 );
if( info->getToken().size() != 0 ) {
dataOut->writeInt( (int)info->getToken().size() );
dataOut->write( (const unsigned char*)(&info->getToken()[0]), (int)info->getToken().size(), 0, (int)info->getToken().size() );
}
}
}
AMQ_CATCH_RETHROW( decaf::io::IOException )
AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, decaf::io::IOException )
Expand Down

0 comments on commit f4cd155

Please sign in to comment.