Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…unk@630693 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Nathan Christopher Mittler committed Feb 24, 2008
1 parent 1f2877f commit f363201
Showing 1 changed file with 26 additions and 15 deletions.
41 changes: 26 additions & 15 deletions src/main/activemq/connector/openwire/OpenWireConnector.cpp
Expand Up @@ -83,7 +83,10 @@ OpenWireConnector::OpenWireConnector( Transport* transport,
// Create our WireFormatFactory on the stack, only need it once.
OpenWireFormatFactory wireFormatFactory;

this->state = CONNECTION_STATE_DISCONNECTED;
synchronized( &mutex ) {
this->state = CONNECTION_STATE_DISCONNECTED;
}

this->exceptionListener = NULL;
this->messageListener = NULL;
this->brokerInfo = NULL;
Expand Down Expand Up @@ -143,7 +146,7 @@ void OpenWireConnector::start() throw( cms::CMSException ) {
__FILE__, __LINE__,
"OpenWireConnector::start - already started" );
}

// Start the transport - this establishes the socket.
transport->start();

Expand All @@ -169,7 +172,7 @@ void OpenWireConnector::close() throw( cms::CMSException ){

// Send the disconnect message to the broker.
disconnect();

// Close the transport now that we've sent the last messages..
transport->close();
}
Expand All @@ -184,8 +187,10 @@ void OpenWireConnector::connect() throw ( ConnectorException ) {

try{

// Mark this connector as started.
state = CONNECTION_STATE_CONNECTING;
synchronized( &mutex ) {
// Mark this connector as started.
state = CONNECTION_STATE_CONNECTING;
}

// Fill in our connection info.
connectionInfo.setUserName( getUsername() );
Expand All @@ -203,12 +208,14 @@ void OpenWireConnector::connect() throw ( ConnectorException ) {
commands::ConnectionId* connectionId = new commands::ConnectionId();
connectionId->setValue( UUID::randomUUID().toString() );
connectionInfo.setConnectionId( connectionId );

// Now we ping the broker and see if we get an ack / nack
Response* response = syncRequest( &connectionInfo );

// Tag us in the Connected State now.
state = CONNECTION_STATE_CONNECTED;
synchronized( &mutex ) {
// Tag us in the Connected State now.
state = CONNECTION_STATE_CONNECTED;
}

// Clean up the ack
delete response;
Expand All @@ -223,8 +230,10 @@ void OpenWireConnector::disconnect() throw ( ConnectorException ) {

try{

// Mark state as no longer connected.
state = CONNECTION_STATE_DISCONNECTED;
synchronized( &mutex ) {
// Mark state as no longer connected.
state = CONNECTION_STATE_DISCONNECTED;
}

// Remove our ConnectionId from the Broker
disposeOf( connectionInfo.getConnectionId() );
Expand Down Expand Up @@ -1394,10 +1403,12 @@ void OpenWireConnector::onTransportException(
if( state == CONNECTION_STATE_DISCONNECTED ){
return;
}

// Mark the fact that we are in an error state
state = CONNECTION_STATE_ERROR;


synchronized( &mutex ) {
// Mark the fact that we are in an error state
state = CONNECTION_STATE_ERROR;
}

// Inform the user of the error.
fire( ex );
}
Expand All @@ -1423,7 +1434,7 @@ Response* OpenWireConnector::syncRequest( Command* command )
throw ( ConnectorException ) {

try {

Response* response = transport->request( command );

commands::ExceptionResponse* exceptionResponse =
Expand Down

0 comments on commit f363201

Please sign in to comment.