Skip to content

Commit

Permalink
SRAMP-433 rough-draft design and JMS impl
Browse files Browse the repository at this point in the history
  • Loading branch information
brmeyer committed Jun 30, 2014
1 parent 4ac7409 commit 258b3f2
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ target
/RemoteSystemsTempFiles
.errai
*~
.checkstyle
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@
<artifactId>s-ramp-distro-shell</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-events</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-installer</artifactId>
Expand Down Expand Up @@ -989,6 +994,8 @@
<module>s-ramp-distro/fabric</module>
<module>s-ramp-distro/fuse61</module>
<module>s-ramp-distro/shell</module>
<module>s-ramp-events</module>
<module>s-ramp-events-jms</module>
<module>s-ramp-installer</module>
<module>s-ramp-integration/java</module>
<module>s-ramp-integration/kie</module>
Expand Down
36 changes: 36 additions & 0 deletions s-ramp-events-jms/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp</artifactId>
<version>0.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>s-ramp-events-jms</artifactId>
<packaging>bundle</packaging>
<name>S-RAMP Events: JMS</name>
<description>JMS implementation of the S-RAMP Events functionality.</description>

<dependencies>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-events</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright 2014 JBoss Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.overlord.sramp.events.jms;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;

import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.overlord.sramp.events.EventProducer;

/**
* Provides a JMS implementation of the {@link EventProducer}.
*
* @author Brett Meyer
*/
@Component(name = "JMS Event Producer", immediate = true)
@Service(value = EventProducer.class)
public class JMSEventProducer extends EventProducer {

// TODO: May need to be configurable based on the platform
private static final String TOPICCONNECTIONFACTORY_JNDI = "TopicConnectionFactory"; //$NON-NLS-1$
// TODO: May need to be configurable based on the platform
private static final String TOPIC_JNDI = "sramp/events/topic"; //$NON-NLS-1$
// TODO: May need to be configurable based on the platform
private static final String QUEUECONNECTIONFACTORY_JNDI = "QueueConnectionFactory"; //$NON-NLS-1$
// TODO: May need to be configurable based on the platform
private static final String QUEUE_JNDI = "sramp/events/queue"; //$NON-NLS-1$

private Context jndiContext = null;
private TopicConnectionFactory topicConnectionFactory = null;
private QueueConnectionFactory queueConnectionFactory = null;

private Map<String, Queue> activeQueues = new HashMap<String, Queue>();

/* (non-Javadoc)
* @see org.overlord.sramp.events.EventProducer#doStartup()
*/
@Override
protected void doStartup() {
try {
jndiContext = new InitialContext();
topicConnectionFactory = (TopicConnectionFactory) jndiContext.lookup(TOPICCONNECTIONFACTORY_JNDI);
queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup(QUEUECONNECTIONFACTORY_JNDI);
} catch (Exception e) {
// TODO
}
}

/* (non-Javadoc)
* @see org.overlord.sramp.events.EventProducer#doShutdown()
*/
@Override
protected void doShutdown() {
queueConnectionFactory = null;
topicConnectionFactory = null;
jndiContext = null;
}

/*
* (non-Javadoc)
*
* @see org.overlord.sramp.events.EventProducer#publishEvent(java.lang.Object)
*/
@Override
public void publishEvent(Serializable event) {
publishTopic(event);
publishQueues(event);
}

private void publishTopic(Serializable event) {
TopicConnection topicConnection = null;
try {
topicConnection = topicConnectionFactory.createTopicConnection();
TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) jndiContext.lookup(TOPIC_JNDI);
TopicPublisher publisher = session.createPublisher(topic);
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(event);
publisher.send(objectMessage);
} catch (Exception e) {
// TODO
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (Exception e) {
// TODO
}
}
}
}

private void publishQueues(Serializable event) {
QueueConnection queueConnection = null;
try {
queueConnection = queueConnectionFactory.createQueueConnection();
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
for (Queue queue : activeQueues.values()) {
MessageProducer producer = session.createProducer(queue);
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(event);
producer.send(objectMessage);
}
} catch (Exception e) {
// TODO
} finally {
if (queueConnection != null) {
try {
queueConnection.close();
} catch (Exception e) {
// TODO
}
}
}
}

/* (non-Javadoc)
* @see org.overlord.sramp.events.EventProducer#startQueue(java.lang.String)
*/
@Override
public void startQueue(String name) {
try {
Queue queue = (Queue) jndiContext.lookup(QUEUE_JNDI);
activeQueues.put(name, queue);
} catch (Exception e) {
// TODO
}
}

/* (non-Javadoc)
* @see org.overlord.sramp.events.EventProducer#stopQueue(java.lang.String)
*/
@Override
public void stopQueue(String name) {
if (! activeQueues.containsKey(name)) {
// TODO
}
activeQueues.remove(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.overlord.sramp.events.jms.JMSEventProducer
32 changes: 32 additions & 0 deletions s-ramp-events/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp</artifactId>
<version>0.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>s-ramp-events</artifactId>
<packaging>bundle</packaging>
<name>S-RAMP Events</name>
<description>S-RAMP Events enables clients to receive event notifications through point-to-point queues and pub/sub topics.</description>

<dependencies>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2014 JBoss Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.overlord.sramp.events;

import java.io.Serializable;


/**
* @author Brett Meyer
*/
public abstract class EventProducer {

/**
* Method called to start and initialize the EventProducer implementation.
*/
public final void startup() {
doStartup();
}

/**
* Starts up the EventProducer.
*/
protected abstract void doStartup();

/**
* Method called when the EventProducer implementation is no longer needed.
*/
public final void shutdown() {
doShutdown();
}

/**
* Shuts down the EventProducer.
*/
protected abstract void doShutdown();

/**
* Interally called by S-RAMP to publish an event, both through pub/sub and point-to-point queues.
*
* @param event The event to publish
*/
// TODO: Probably not a Serializable...
public abstract void publishEvent(Serializable event);

/**
* Start publishing message to the specified queue.
*
* @return String The queue to start
*/
public abstract void startQueue(String name);

/**
* Stop publishing message to the specified queue.
*
* @param name The queue to stop
*/
public abstract void stopQueue(String name);
}

0 comments on commit 258b3f2

Please sign in to comment.