Skip to content

Commit

Permalink
Adding more examples for the CPP client, this is a simple async consu…
Browse files Browse the repository at this point in the history
…mer example and a simple producer.

git-svn-id: https://svn.apache.org/repos/asf/activemq/activemq-cpp/trunk@588700 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Timothy A. Bish committed Oct 26, 2007
1 parent ae2bf44 commit f4676a3
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 3 deletions.
8 changes: 7 additions & 1 deletion src/examples/Makefile.am
Expand Up @@ -28,8 +28,14 @@ bin_PROGRAMS = example
example_SOURCES = $(main_example_sources)
example_LDADD= ../main/libactivemq-cpp.la

## Simple Consumer
## Simple Async Consumer
simple_async_consumer_sources = ./consumers/SimpleAsyncConsumer.cpp
bin_PROGRAMS += simple_async_consumer
simple_async_consumer_SOURCES = $(simple_async_consumer_sources)
simple_async_consumer_LDADD= ../main/libactivemq-cpp.la

## Simple Producer
simple_producer_sources = ./producers/SimpleProducer.cpp
bin_PROGRAMS += simple_producer
simple_producer_SOURCES = $(simple_producer_sources)
simple_producer_LDADD= ../main/libactivemq-cpp.la
7 changes: 5 additions & 2 deletions src/examples/consumers/SimpleAsyncConsumer.cpp
Expand Up @@ -204,7 +204,6 @@ int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
// "&transport.tcpTracingEnabled=true";
// "&wireFormat.tightEncodingEnabled=true";


//============================================================
// This is the Destination Name and URI options. Use this to
// customize where the consumer listens, to have the consumer
Expand All @@ -215,7 +214,7 @@ int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
//============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in both consumer an producer.
// createQueue to be used in the consumer.
//============================================================
bool useTopics = true;

Expand All @@ -228,4 +227,8 @@ int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
// Wait to exit.
std::cout << "Press 'q' to quit" << std::endl;
while( std::cin.get() != 'q') {}

std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================\n";
}
215 changes: 215 additions & 0 deletions src/examples/producers/SimpleProducer.cpp
@@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Integer.h>
#include <activemq/util/Config.h>
#include <activemq/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <iostream>

using namespace activemq;
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;

////////////////////////////////////////////////////////////////////////////////
class SimpleProducer : public Runnable {
private:

Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
bool useTopic;
unsigned int numMessages;
std::string brokerURI;
std::string destURI;

public:

SimpleProducer( const std::string& brokerURI,
unsigned int numMessages,
const std::string& destURI,
bool useTopic = false ){
connection = NULL;
session = NULL;
destination = NULL;
producer = NULL;
this->numMessages = numMessages;
this->useTopic = useTopic;
this->brokerURI = brokerURI;
this->destURI = destURI;
}

virtual ~SimpleProducer(){
cleanup();
}

virtual void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory( brokerURI );

// Create a Connection
connection = connectionFactory->createConnection();
connection->start();

// free the factory, we are done with it.
delete connectionFactory;

// Create a Session
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );

// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( "TEST.FOO" );
} else {
destination = session->createQueue( "TEST.FOO" );
}

// Create a MessageProducer from the Session to the Topic or Queue
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );

// Create the Thread Id String
string threadIdStr = Integer::toString( Thread::getId() );

// Create a messages
string text = (string)"Hello world! from thread " + threadIdStr;

for( int ix=0; ix<numMessages; ++ix ){
TextMessage* message = session->createTextMessage( text );

message->setIntProperty( "Integer", ix );

// Tell the producer to send the message
printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );
producer->send( message );

delete message;
}

}catch ( CMSException& e ) {
e.printStackTrace();
}
}

private:

void cleanup(){

// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch ( CMSException& e ) { e.printStackTrace(); }
destination = NULL;

try{
if( producer != NULL ) delete producer;
}catch ( CMSException& e ) { e.printStackTrace(); }
producer = NULL;

// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch ( CMSException& e ) { e.printStackTrace(); }

try{
if( session != NULL ) delete session;
}catch ( CMSException& e ) { e.printStackTrace(); }
session = NULL;

try{
if( connection != NULL ) delete connection;
}catch ( CMSException& e ) { e.printStackTrace(); }
connection = NULL;
}
};

////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {

std::cout << "=====================================================\n";
std::cout << "Starting the example:" << std::endl;
std::cout << "-----------------------------------------------------\n";

// Set the URI to point to the IPAddress of your broker.
// add any optional params to the url to enable things like
// tightMarshalling or tcp logging etc. See the CMS website for
// a full list of configuration options.
//
// http://activemq.apache.org/cms/
//
// Wire Foormat Options:
// =====================
// Use either stomp or openwire, the default ports are different for each
//
// Examples:
// tcp://127.0.0.1:61616 default to openwire
// tcp://127.0.0.1:61616?wireFormat=openwire same as above
// tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead
//
std::string brokerURI =
"tcp://127.0.0.1:61616"
"?wireFormat=openwire"
"&transport.useAsyncSend=true";
// "&transport.commandTracingEnabled=true"
// "&transport.tcpTracingEnabled=true";
// "&wireFormat.tightEncodingEnabled=true";

//============================================================
// Total number of messages for this producer to send.
//============================================================
unsigned int numMessages = 2000;

//============================================================
// This is the Destination Name and URI options. Use this to
// customize where the Producer produces, to have the producer
// use a topic or queue set the 'useTopics' flag.
//============================================================
std::string destURI = "TEST.FOO";

//============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in the producer.
//============================================================
bool useTopics = true;

// Create the producer and run it.
SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
producer.run();

std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================\n";
}

0 comments on commit f4676a3

Please sign in to comment.