Skip to content

Commit

Permalink
SRAMP-433 JMS events
Browse files Browse the repository at this point in the history
  • Loading branch information
brmeyer committed Aug 28, 2014
1 parent b62d4a0 commit 331d36c
Show file tree
Hide file tree
Showing 34 changed files with 1,274 additions and 91 deletions.
30 changes: 30 additions & 0 deletions pom.xml
Expand Up @@ -273,6 +273,16 @@
<artifactId>s-ramp-distro-shell</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-events</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-events-jms</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-installer</artifactId>
Expand Down Expand Up @@ -437,6 +447,18 @@
<version>${project.version}</version>
<classifier>sources</classifier>
</dependency>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-events</artifactId>
<version>${project.version}</version>
<classifier>sources</classifier>
</dependency>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-events-jms</artifactId>
<version>${project.version}</version>
<classifier>sources</classifier>
</dependency>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-repository</artifactId>
Expand Down Expand Up @@ -571,6 +593,12 @@
<artifactId>overlord-commons-config</artifactId>
<version>${version.org.overlord.overlord-commons}</version>
</dependency>
<dependency>
<groupId>org.overlord</groupId>
<artifactId>overlord-commons-dist-eap6</artifactId>
<version>${overlord-commons.version}</version>
<type>zip</type>
</dependency>
<dependency>
<groupId>org.overlord</groupId>
<artifactId>overlord-commons-errai-fuse-support</artifactId>
Expand Down Expand Up @@ -940,6 +968,8 @@
<module>s-ramp-demos</module>
<module>s-ramp-dev-server</module>
<module>s-ramp-distro</module>
<module>s-ramp-events</module>
<module>s-ramp-events-jms</module>
<module>s-ramp-installer</module>
<module>s-ramp-integration</module>
<module>s-ramp-javadoc</module>
Expand Down
1 change: 1 addition & 0 deletions s-ramp-api/.gitignore
Expand Up @@ -6,3 +6,4 @@
/.settings
/bin
/target
/target
Expand Up @@ -51,15 +51,17 @@ public class SrampConstants {
public static final QName SRAMP_TOTAL_RESULTS_QNAME = new QName(SRAMP_NS, SRAMP_TOTAL_RESULTS, SRAMP_PREFIX);

// Configuration constants
public static final String SRAMP_CONFIG_FILE_NAME = "sramp.config.file.name"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_FILE_REFRESH = "sramp.config.file.refresh"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_BASEURL = "sramp.config.baseurl"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_AUDITING = "sramp.config.auditing.enabled"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_DERIVED_AUDITING = "sramp.config.auditing.enabled-derived"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_AUDIT_USER = "sramp.config.auditing.user"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_AUDIT_PASS = "sramp.config.auditing.password"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_JCR_REPO_JNDI = "sramp.config.jcr.repository.jndi-path"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_JCR_REPO_NAME = "sramp.config.jcr.repository.name"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_FILE_NAME = "sramp.config.file.name"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_FILE_REFRESH = "sramp.config.file.refresh"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_BASEURL = "sramp.config.baseurl"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_AUDITING = "sramp.config.auditing.enabled"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_DERIVED_AUDITING = "sramp.config.auditing.enabled-derived"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_AUDIT_USER = "sramp.config.auditing.user"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_AUDIT_PASS = "sramp.config.auditing.password"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_JCR_REPO_JNDI = "sramp.config.jcr.repository.jndi-path"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_JCR_REPO_NAME = "sramp.config.jcr.repository.name"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_EVENT_TOPICS = "sramp.config.events.jms.topics"; //$NON-NLS-1$
public static final String SRAMP_CONFIG_EVENT_QUEUES = "sramp.config.events.jms.queues"; //$NON-NLS-1$

// Location of a directory containing JARs which provide custom derivers
public static final String SRAMP_CUSTOM_DERIVER_DIR = "sramp.derivers.customDir"; //$NON-NLS-1$
Expand Down
10 changes: 10 additions & 0 deletions s-ramp-distro/assembly/pom.xml
Expand Up @@ -127,6 +127,16 @@
<artifactId>s-ramp-common</artifactId>
<classifier>sources</classifier>
</dependency>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-events</artifactId>
<classifier>sources</classifier>
</dependency>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-events-jms</artifactId>
<classifier>sources</classifier>
</dependency>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-repository</artifactId>
Expand Down
62 changes: 62 additions & 0 deletions s-ramp-events-jms/pom.xml
@@ -0,0 +1,62 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp</artifactId>
<version>0.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>s-ramp-events-jms</artifactId>
<packaging>bundle</packaging>
<name>S-RAMP Events: JMS</name>
<description>JMS implementation of the S-RAMP Events functionality.</description>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
<version>${version.org.apache.activemq}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-common</artifactId>
</dependency>
<dependency>
<groupId>org.overlord.sramp</groupId>
<artifactId>s-ramp-events</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
</plugin>
</plugins>
</build>

</project>
@@ -0,0 +1,229 @@
/*
* Copyright 2014 JBoss Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.overlord.sramp.events.jms;

import java.util.ArrayList;
import java.util.List;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.lang.StringUtils;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.codehaus.jackson.map.ObjectMapper;
import org.oasis_open.docs.s_ramp.ns.s_ramp_v1.BaseArtifactType;
import org.overlord.sramp.common.Sramp;
import org.overlord.sramp.common.SrampConstants;
import org.overlord.sramp.events.ArtifactUpdateEvent;
import org.overlord.sramp.events.EventProducer;
import org.overlord.sramp.events.OntologyUpdateEvent;
import org.overlord.sramp.events.jms.i18n.Messages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3._1999._02._22_rdf_syntax_ns_.RDF;

/**
* Provides a JMS implementation of the {@link EventProducer}.
*
* @author Brett Meyer
*/
@Component(name = "JMS Event Producer", immediate = true)
@Service(value = EventProducer.class)
public class JMSEventProducer implements EventProducer {

public static final String JMS_TYPE_ARTIFACT_CREATED = "sramp:artifactCreated";
public static final String JMS_TYPE_ARTIFACT_UPDATED = "sramp:artifactUpdated";
public static final String JMS_TYPE_ARTIFACT_DELETED = "sramp:artifactDeleted";
public static final String JMS_TYPE_ONTOLOGY_CREATED = "sramp:ontologyCreated";
public static final String JMS_TYPE_ONTOLOGY_UPDATED = "sramp:ontologyUpdated";
public static final String JMS_TYPE_ONTOLOGY_DELETED = "sramp:ontologyDeleted";

private static final String CONNECTIONFACTORY_JNDI = "ConnectionFactory"; //$NON-NLS-1$

// TODO: Should the port be configurable?
private static final String ACTIVEMQ_PROVIDER_URL = "tcp://localhost:61616";

private static Logger LOG = LoggerFactory.getLogger(JMSEventProducer.class);

private static final Sramp sramp = new Sramp();

private Connection connection = null;

private Session session = null;

private List<Destination> destinations = new ArrayList<Destination>();

@Override
public void startup() {
try {
// Note that both properties end up doing the same thing. Technically, we could combine both into one
// single sramp.config.events.jms.destinations, but leaving them split for readability.

String topicNamesProp = sramp.getConfigProperty(SrampConstants.SRAMP_CONFIG_EVENT_TOPICS, "");
String[] topicNames = new String[0];
if (StringUtils.isNotEmpty(topicNamesProp)) {
topicNames = topicNamesProp.split(",");
}

String queueNamesProp = sramp.getConfigProperty(SrampConstants.SRAMP_CONFIG_EVENT_QUEUES, "");
String[] queueNames = new String[0];
if (StringUtils.isNotEmpty(queueNamesProp)) {
queueNames = queueNamesProp.split(",");
}

try {
// First, see if a ConnectionFactory and Topic/Queue exists on JNDI. If so, assume JMS is properly
// setup in a Java EE container and simply use it.

ConnectionFactory connectionFactory = (ConnectionFactory) jndiLookup(CONNECTIONFACTORY_JNDI);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

for (String topicName : topicNames) {
Topic topic = (Topic) jndiLookup(topicName);
destinations.add(topic);
}

for (String queueName : queueNames) {
Queue queue = (Queue) jndiLookup(queueName);
destinations.add(queue);
}
} catch (NamingException e) {
// JMS wasn't setup. Assume we need to start an embedded ActiveMQ broker and create the destinations.
LOG.warn(Messages.i18n.format("org.overlord.sramp.events.jms.embedded_broker"));

session = null;
destinations.clear();

BrokerService broker = new BrokerService();
broker.addConnector(ACTIVEMQ_PROVIDER_URL);
// TODO: Add security plugin? JAAS?
broker.start();

// Event though we added a TCP connector, above, ActiveMQ also exposes the broker over the "vm"
// protocol. It optimizes performance for connections on the same JVM.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

for (String topicName : topicNames) {
destinations.add(session.createTopic(topicName));
}

for (String queueName : queueNames) {
destinations.add(session.createQueue(queueName));
}
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

@Override
public void artifactCreated(BaseArtifactType artifact) {
publishEvent(artifact, JMS_TYPE_ARTIFACT_CREATED);
}

@Override
public void artifactUpdated(BaseArtifactType updatedArtifact, BaseArtifactType oldArtifact) {
ArtifactUpdateEvent event = new ArtifactUpdateEvent(updatedArtifact, oldArtifact);
publishEvent(event, JMS_TYPE_ARTIFACT_UPDATED);
}

@Override
public void artifactDeleted(BaseArtifactType artifact) {
publishEvent(artifact, JMS_TYPE_ARTIFACT_DELETED);
}

@Override
public void ontologyCreated(RDF ontology) {
publishEvent(ontology, JMS_TYPE_ONTOLOGY_CREATED);
}

@Override
public void ontologyUpdated(RDF updatedOntology, RDF oldOntology) {
OntologyUpdateEvent event = new OntologyUpdateEvent(updatedOntology, oldOntology);
publishEvent(event, JMS_TYPE_ONTOLOGY_UPDATED);
}

@Override
public void ontologyDeleted(RDF ontology) {
publishEvent(ontology, JMS_TYPE_ONTOLOGY_DELETED);
}

private void publishEvent(Object payload, String type) {
for (Destination destination : destinations) {
MessageProducer producer = null;
try {
producer = session.createProducer(destination);
TextMessage textMessage = session.createTextMessage();
textMessage.setJMSType(type);

ObjectMapper mapper = new ObjectMapper();
String text = mapper.writeValueAsString(payload);
textMessage.setText(text);

producer.send(textMessage);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
if (producer != null) {
try {
producer.close();
} catch (Exception e) {
}
}
}
}
}

private Object jndiLookup(String name) throws NamingException {
Context initContext = new InitialContext();
try {
Context jndiContext = (Context) initContext.lookup("java:comp/env");
return jndiContext.lookup(name);
} catch (NamingException e) {
// EAP (no namespace)
Context jndiContext = (Context) initContext.lookup("java:");
return jndiContext.lookup(name);
}
}

@Override
public void shutdown() {
try {
session.close();
} catch (Exception e) {
}
try {
connection.close();
} catch (Exception e) {
}
}
}

0 comments on commit 331d36c

Please sign in to comment.