Skip to content
Browse files

[ignore] removal of replication extension; moved to new repo: https:/…

  • Loading branch information...
1 parent 6f52a82 commit 85b2f69797447c221a5383aa24deae944ab27ca8 @dizzzz dizzzz committed Sep 19, 2013
Showing with 0 additions and 5,143 deletions.
  1. +0 −121 extensions/replication/doc/README.txt
  2. +0 −7 extensions/replication/doc/TODO.txt
  3. +0 −73 extensions/replication/doc/conf.xml
  4. +0 −55 extensions/replication/doc/example.xconf
  5. +0 −15 extensions/replication/extension.xml
  6. +0 −67 extensions/replication/ivy.xml
  7. +0 −16 extensions/replication/ivysettings.xml
  8. +0 −240 extensions/replication/src/org/exist/messaging/JmsMessageReceiver.java
  9. +0 −257 extensions/replication/src/org/exist/messaging/JmsMessageSender.java
  10. +0 −36 extensions/replication/src/org/exist/messaging/MessageReceiver.java
  11. +0 −37 extensions/replication/src/org/exist/messaging/MessageSender.java
  12. +0 −191 extensions/replication/src/org/exist/messaging/MyListener.java
  13. +0 −74 extensions/replication/src/org/exist/messaging/configuration/JmsMessagingConfiguration.java
  14. +0 −40 extensions/replication/src/org/exist/messaging/configuration/MessagingConfiguration.java
  15. +0 −35 extensions/replication/src/org/exist/messaging/configuration/MessagingMetadata.java
  16. +0 −121 extensions/replication/src/org/exist/messaging/configuration/NodeParser.java
  17. +0 −78 extensions/replication/src/org/exist/messaging/xquery/MessagingModule.java
  18. +0 −127 extensions/replication/src/org/exist/messaging/xquery/ReceiveMessage.java
  19. +0 −116 extensions/replication/src/org/exist/messaging/xquery/SendMessage.java
  20. +0 −273 extensions/replication/src/org/exist/replication/jms/obsolete/FileSystemListener.java
  21. +0 −83 extensions/replication/src/org/exist/replication/jms/obsolete/ResourceReplicator.java
  22. +0 −83 extensions/replication/src/org/exist/replication/jms/obsolete/Sender.java
  23. +0 −229 extensions/replication/src/org/exist/replication/jms/publish/JMSMessageSender.java
  24. +0 −141 extensions/replication/src/org/exist/replication/jms/publish/PublisherParameters.java
  25. +0 −447 extensions/replication/src/org/exist/replication/jms/publish/ReplicationTrigger.java
  26. +0 −957 extensions/replication/src/org/exist/replication/jms/subscribe/JMSMessageListener.java
  27. +0 −47 extensions/replication/src/org/exist/replication/jms/subscribe/MessageReceiveException.java
  28. +0 −182 extensions/replication/src/org/exist/replication/jms/subscribe/MessageReceiverStartupTrigger.java
  29. +0 −197 extensions/replication/src/org/exist/replication/jms/subscribe/SubscriberParameters.java
  30. +0 −33 extensions/replication/src/org/exist/replication/shared/ClientParameterException.java
  31. +0 −194 extensions/replication/src/org/exist/replication/shared/ClientParameters.java
  32. +0 −58 extensions/replication/src/org/exist/replication/shared/JmsConnectionExceptionListener.java
  33. +0 −55 extensions/replication/src/org/exist/replication/shared/JmsConnectionHelper.java
  34. +0 −175 extensions/replication/src/org/exist/replication/shared/MessageHelper.java
  35. +0 −39 extensions/replication/src/org/exist/replication/shared/MessageSender.java
  36. +0 −46 extensions/replication/src/org/exist/replication/shared/TransportException.java
  37. +0 −198 extensions/replication/src/org/exist/replication/shared/eXistMessage.java
View
121 extensions/replication/doc/README.txt
@@ -1,121 +0,0 @@
-$Id$
-
-This document provides a short introduction on the document replication function
-of eXist-db.
-
- "You want to configure two or more eXist instances to work together to
- automatically synchronize collection-specific data sets. This allows you
- to scale your eXist server capacity. For example, with multiple eXist servers
- configured to stay in sync as described below, you could add a load-balancer
- to distribute the load of incoming queries across the pool of servers
- and still maintain high performance."
-
- Quoted from http://en.wikibooks.org/wiki/XQuery/eXist_Clustering
-
-Fortunately the steps are not too complex.
-
-
-Preparation
-===========
-
-ActimeMQ
---------
-- Download recent version from ActiveMQ from http://activemq.apache.org/download.html ;
- Note that the TGZ file has additional unix (linux, MacOsX) support, the ZIP file
- is for Windows. The contents of the archives actually differ.
-- Extract content to disk, refered as ACTIVEMQ_HOME
-- Copy the activemq-all-X.Y.Z.jar file to EXIST_HOME/lib/user
-
-eXistdb
--------
-- Build replication extension (modify extensions/local.build.properties) or copy the
- pre-built version of exist-replication.jar to lib/extensions
-
-
-
-Get Started
-===========
-- For 'Master' server (publisher)
- - Create collection '/db/mycollection/' that shall be monitored for document changes
- - Create collection '/db/system/config/db/mycollection/'
- - Create in there a document 'collection.xconf' and add the following content to
- the document:
-
- <collection xmlns="http://exist-db.org/collection-config/1.0">
- <triggers>
- <trigger class="org.exist.replication.jms.publish.ReplicationTrigger">
-
- <parameter name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
- <parameter name="java.naming.provider.url" value="tcp://myserver.local:61616"/>
-
- <parameter name="connectionfactory" value="ConnectionFactory"/>
- <parameter name="topic" value="dynamicTopics/eXistdb"/>
-
- <!-- Set value -->
- <parameter name="client-id" value="SetPublisherId"/>
-
- </trigger>
- </triggers>
- </collection>
-
- - Set the correct value for 'java.naming.provider.url' that matches your message broker
- - Set a unique value for the "client-id" parameter.
-
-
-- For each 'Slave' (subscriber)
- - Add a startup trigger to conf.xml:
-
- <trigger class="org.exist.replication.jms.subscribe.MessageReceiverStartupTrigger">>
-
- <parameter name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
- <parameter name="java.naming.provider.url" value="tcp://myserver.local:61616"/>
-
- <parameter name="connectionfactory" value="ConnectionFactory"/>
- <parameter name="topic" value="dynamicTopics/eXistdb"/>
-
- <!-- set values -->
- <parameter name="client-id" value="SetSubscriberId"/>
- <parameter name="subscriber-name" value="SetSubscriptionId"/>
-
- </trigger>
-
- - Set the correct value for 'java.naming.provider.url' that matches your message broker
- - Set unique values for "client-id" and 'subscriber-name'
-
- - Create the collection '/db/mycollection/' , this is the collection that receives the
- documents that are updated in the same collection on the 'Master' server.
-
-
-
-
-Start-up
---------
-- Start ActiveMQ server:
- cd ACTIVEMQ_HOME
- ./bin/activemq start [for mac, use the bin/macosx wrapper directory]
-
-- Start Slave
- cd EXISTSLAVE_HOME
- ./bin/startup.sh
-
-- Start Master
- cd EXISTMASTER_HOME
- ./bin/startup.sh
-
-
-Distribute
-----------
-- Create a document in the master server in '/db/mycollection/' (e.g. using the
- java client or eXide; login as admin); The document will be automatically
- replicated to the slave servers.
-
-
-Performance Test
-----------------
-- With eXide upload a +- 50k XML document store as /db/mydoc.xml
- Execute the query, check the timing on the slave (see exist.log)
-
- let $doc := doc('/db/mydoc.xml')
- for $i in (1000 to 3000)
- return
- xmldb:store('/db/mycollection', concat('mydoc', $i , ".xml"), $doc)
View
7 extensions/replication/doc/TODO.txt
@@ -1,7 +0,0 @@
-ToDo's
-
-- handle exceptions, stop initialization and report if needed
-- require Topic as destination, else fail
-
-- prevent infinite loop if node is sender and receiver
- - switch - off triggers when writing documents.
View
73 extensions/replication/doc/conf.xml
@@ -1,73 +0,0 @@
-<!--
- Start JMS listener for listener of the clustering feature.
--->
-<trigger
- class="org.exist.replication.jms.subscribe.MessageReceiverStartupTrigger">
- <!--
- Class name of the initial context provider, default value for ActiveMQ
- see javax.naming.Context#INITIAL_CONTEXT_FACTORY
- -->
- <parameter name="java.naming.factory.initial"
- value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
-
- <!--
- URL of the message broker, default value for ActiveMQ
- see javax.naming.Context#PROVIDER_URL
- -->
- <parameter name="java.naming.provider.url"
- value="tcp://myserver.local:61616"/>
-
- <!--
- Lookup connection factory
- see javax.naming.InitialContext#lookup(String)
- -->
- <parameter name="connectionfactory" value="ConnectionFactory"/>
-
- <!--
- Lookup destination (topic)
- see javax.naming.InitialContext#lookup(String)
- -->
- <parameter name="topic" value="dynamicTopics/eXistdb"/>
-
- <!--
- Set client identifier for this connection. Required for durable
- subscriptions (default setting)
- see javax.jms.Connection#setClientID(string)
- -->
- <parameter name="client-id" value="SubscriberId"/>
-
- <!--
- Set the name used to identify this subscription
- see JMS javax.jms.TopicSession#createDurableSubscriber(Topic,String)
- -->
- <parameter name="subscriber-name" value="SubscriptionId"/>
-
- <!--
- Set the subscription is durable. default = yes
- see JMS javax.jms.Session#createDurableSubscriber(Topic,String)
- see JMS javax.jms.Session#createConsumer(Destination, String, boolean)
-
- [Optional]
- -->
- <!--<parameter name="durable" value="yes"/>-->
-
- <!--
- A JMS API message selector allows a message consumer to specify
- the messages it is interested in. The syntax of the expression is based
- on a subset of the SQL92 conditional expression syntax.
-
- see javax.jms.Message
-
- [Optional]
- -->
- <!--<parameter name="messageselector"
- value="property1 = 'a' OR property2 = 'b'"/>-->
-
- <!--
- If set, inhibits delivery of messages published by its own connection
- default = yes
-
- [Optional]
- -->
- <!--<parameter name="nolocal" value="yes"/>-->
-</trigger>
View
55 extensions/replication/doc/example.xconf
@@ -1,55 +0,0 @@
-<collection xmlns="http://exist-db.org/collection-config/1.0">
- <triggers>
- <trigger class="org.exist.replication.jms.publish.ReplicationTrigger">
-
- <!--
- Class name of the initial context provider, default value for ActiveMQ
- see javax.naming.Context#INITIAL_CONTEXT_FACTORY
- -->
- <parameter name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
-
- <!--
- URL of the message broker, default value for ActiveMQ
- see javax.naming.Context#PROVIDER_URL
- -->
- <parameter name="java.naming.provider.url" value="tcp://myserver.local:61616"/>
-
- <!--
- Lookup connection factory
- see javax.naming.InitialContext#lookup(String)
- -->
- <parameter name="connectionfactory" value="ConnectionFactory"/>
-
- <!--
- Lookup destination (topic)
- see javax.naming.InitialContext#lookup(String)
- -->
- <parameter name="topic" value="dynamicTopics/eXistdb"/>
-
- <!--
- Set client identifier for this connection.
- see javax.jms.Connection#setClientID(string)
-
- [Optional]
- -->
- <!-- <parameter name="client-id" value="PublisherId"/> -->
-
- <!--
- Set time-to-live in milliseconds, default value is 0 (unlimited)
- see javax.jms.MessageProducer#setTimeToLive(long)
-
- [Optional]
- -->
- <!-- <parameter name="time-to-live" value="0"/> -->
-
- <!--
- Sets the producer priority, value 0-9 ; default is 4.
- see javax.jms.MessageProducer#setPriority(int)
-
- [Optional]
- -->
- <!-- <parameter name="priority" value="4"/>-->
-
- </trigger>
- </triggers>
-</collection>
View
15 extensions/replication/extension.xml
@@ -1,15 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project basedir="." default="all" name="replication">
-
- <property file="../local.build.properties"/>
- <property file="../build.properties"/>
-
- <property name="extension.name" value="${ant.project.name}"/>
- <property name="existhome.dir" location="../.."/>
-
- <property name="extension.include" value="${include.feature.replication}"/>
-
- <!-- import common.xml here -->
- <import file="${existhome.dir}/build/scripts/extensions-common.xml"/>
-
-</project>
View
67 extensions/replication/ivy.xml
@@ -1,67 +0,0 @@
-<!--
- Ivy module to retrieve the tika jar, including dependancies, excluding duplicate jars
-
- $Id$
--->
-<ivy-module version="2.0">
- <info organisation="org.exist" module="replication"/>
- <dependencies>
-
- <dependency org="org.apache.activemq" name="activemq-all" rev="5.8.0" conf="*->*,!sources,!javadoc">
- <exclude module="slf4j-api"/>
-
- <!--
- Remove the following excludes to enable the new AMQP support
- http://www.amqp.org
- http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol
- -->
- <exclude module="proton-jms"/>
- <exclude module="hawtbuf"/>
- <exclude module="activemq-amqp"/>
- </dependency>
-
- <!-- Please leave here -->
- <!-- <dependency org="org.apache.activemq" name="activemq-core" rev="5.6.0" conf="*->*,!sources,!javadoc">
-
- <exclude module="slf4j-api"/>
- <exclude module="commons-logging"/>
- <exclude module="kahadb"/>
- <exclude module="org.osgi.core"/>
- <exclude module="spring-context"/>
- <exclude module="commons-net"/>
- <exclude module="jasypt"/>
-
-
- <exclude module="activemq-openwire-generator"/>
- <exclude module="activemq-protobuf"/>
- <exclude module="activemq-jmdns_1.0"/>
- <exclude module="activemq-jaas"/>
- <exclude module="geronimo-jta_1.0.1B_spec"/>
-
-
- <exclude module="groovy-all"/>
-
-
- <exclude module="xbean-spring"/>
- <exclude module="spring-aop"/>
- <exclude module="commons-logging"/>
-
- <exclude module="commons-pool"/>
- <exclude module="xalan"/>
- <exclude module="ant"/>
- <exclude module="xpp3_min"/>
- <exclude module="derby"/>
-
- <exclude module="aopalliance"/>
-
- <exclude module="xstream"/>
- <exclude module="stax-api"/>
- <exclude module="gram"/>
- <exclude module="jettison"/>
- <exclude module="geronimo-jacc_1.1_spec"/>
- <exclude module="geronimo-annotation_1.0_spec"/>
-
- </dependency>-->
-
- </dependencies>
-</ivy-module>
View
16 extensions/replication/ivysettings.xml
@@ -1,16 +0,0 @@
-<ivysettings>
- <properties file="build.properties" />
- <settings defaultResolver="local-chain"/>
- <resolvers>
- <ibiblio name="ibiblio-maven2" m2compatible="true"/>
- <ibiblio name="java-net-maven2" root="http://download.java.net/maven/2/" m2compatible="true" />
- <ibiblio name="maven" root="http://mvnrepository.com/artifact/" m2compatible="true" />
- <ibiblio name="fuse" root="http://repo.fusesource.com/nexus/content/groups/public/" m2compatible="true" />
- <chain name="local-chain">
- <resolver ref="maven"/>
- <resolver ref="ibiblio-maven2"/>
- <resolver ref="java-net-maven2"/>
- <resolver ref="fuse"/>
- </chain>
- </resolvers>
-</ivysettings>
View
240 extensions/replication/src/org/exist/messaging/JmsMessageReceiver.java
@@ -1,240 +0,0 @@
-
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.messaging;
-
-import java.util.Properties;
-import javax.jms.*;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import org.apache.log4j.Logger;
-import org.exist.memtree.NodeImpl;
-import org.exist.messaging.configuration.JmsMessagingConfiguration;
-import org.exist.xquery.XPathException;
-import org.exist.xquery.XQueryContext;
-import org.exist.xquery.value.*;
-
-/**
- *
- * @author wessels
- */
-public class JmsMessageReceiver implements MessageReceiver {
-
- private final static Logger LOG = Logger.getLogger(JmsMessageReceiver.class);
- private XQueryContext xqcontext;
-
- public JmsMessageReceiver(XQueryContext context) {
- xqcontext = context;
- }
-
- @Override
- public NodeImpl receive(JmsMessagingConfiguration jmc, FunctionReference ref) throws XPathException {
-
- // JMS specific checks
- jmc.validateContent();
-
- // Retrieve relevant values
- String initialContextFactory = jmc.getInitalContextProperty(Context.INITIAL_CONTEXT_FACTORY);
-
- String providerURL = jmc.getInitalContextProperty(Context.PROVIDER_URL);
-
- String connectionFactory = jmc.getConnectionFactory();
-
- //String destination = jmc.getDestination();
-
- MyListener myListener = new MyListener(ref, xqcontext);
-
-
- // TODO split up, use more exceptions, add better reporting
- try {
- Properties props = new Properties();
- props.setProperty(Context.INITIAL_CONTEXT_FACTORY, initialContextFactory);
- props.setProperty(Context.PROVIDER_URL, providerURL);
- javax.naming.Context context = new InitialContext(props);
-
- // Setup connection
- ConnectionFactory cf = (ConnectionFactory) context.lookup(connectionFactory);
-
-
- Connection connection = cf.createConnection();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Destination dest = (Destination) context.lookup("dynamicQueues/Dannes");
-
- MessageConsumer messageConsumer = session.createConsumer(dest);
-
- messageConsumer.setMessageListener(myListener);
-
- connection.start();
-
-
-//
-// // Close connection
-// // TODO keep connection open for re-use, efficiency
-// connection.close();
-
- return null;// createReport(message, xqcontext);
-
- } catch (Throwable ex) {
- LOG.error(ex);
- throw new XPathException(ex);
- }
- }
-
-// /*
-// *
-// */
-// private Message createMessage(Session session, Item item, MessagingMetadata mdd, XQueryContext xqcontext) throws JMSException, XPathException {
-//
-//
-// Message message = null;
-//
-// mdd.add("exist.datatype", Type.getTypeName(item.getType()));
-//
-// if (item.getType() == Type.ELEMENT || item.getType() == Type.DOCUMENT) {
-// LOG.debug("Streaming element or document node");
-//
-// if (item instanceof NodeProxy) {
-// NodeProxy np = (NodeProxy) item;
-// String uri = np.getDocument().getBaseURI();
-// LOG.debug("Document detected, adding URL " + uri);
-// mdd.add("exist.document-uri", uri);
-// }
-//
-// // Node provided
-// Serializer serializer = xqcontext.getBroker().newSerializer();
-//
-// NodeValue node = (NodeValue) item;
-// InputStream is = new NodeInputStream(serializer, node);
-//
-// ByteArrayOutputStream baos=new ByteArrayOutputStream();
-// try {
-// IOUtils.copy(is, baos);
-// } catch (IOException ex) {
-// LOG.error(ex);
-// throw new XPathException(ex);
-// }
-// IOUtils.closeQuietly(is);
-// IOUtils.closeQuietly(baos);
-//
-// BytesMessage bytesMessage = session.createBytesMessage();
-// bytesMessage.writeBytes(baos.toByteArray());
-//
-// message=bytesMessage;
-//
-//
-// } else if (item.getType() == Type.BASE64_BINARY || item.getType() == Type.HEX_BINARY) {
-// LOG.debug("Streaming base64 binary");
-//
-// if (item instanceof Base64BinaryDocument) {
-// Base64BinaryDocument b64doc = (Base64BinaryDocument) item;
-// String uri = b64doc.getUrl();
-// LOG.debug("Base64BinaryDocument detected, adding URL " + uri);
-// mdd.add("exist.document-uri", uri);
-// }
-//
-// BinaryValue binary = (BinaryValue) item;
-//
-// ByteArrayOutputStream baos=new ByteArrayOutputStream();
-// InputStream is = binary.getInputStream();
-//
-// //TODO consider using BinaryValue.getInputStream()
-// //byte[] data = (byte[]) binary.toJavaObject(byte[].class);
-//
-// try {
-// IOUtils.copy(is, baos);
-// } catch (IOException ex) {
-// LOG.error(ex);
-// throw new XPathException(ex);
-// }
-// IOUtils.closeQuietly(is);
-// IOUtils.closeQuietly(baos);
-//
-// BytesMessage bytesMessage = session.createBytesMessage();
-// bytesMessage.writeBytes(baos.toByteArray());
-//
-// message=bytesMessage;
-//
-//
-// } else {
-//
-// TextMessage textMessage = session.createTextMessage();
-// textMessage.setText(item.getStringValue());
-// message=textMessage;
-// }
-//
-// return message;
-// }
-
-// /**
-// * Create messaging results report
-// */
-// private NodeImpl createReport(Message message, XQueryContext xqcontext) {
-//
-// MemTreeBuilder builder = xqcontext.getDocumentBuilder();
-//
-// // start root element
-// int nodeNr = builder.startElement("", "JMS", "JMS", null);
-//
-// try {
-// String txt = message.getJMSMessageID();
-// if (txt != null) {
-// builder.startElement("", "MessageID", "MessageID", null);
-// builder.characters(message.getJMSMessageID());
-// builder.endElement();
-// }
-// } catch (JMSException ex) {
-// LOG.error(ex);
-// }
-//
-// try {
-// String txt = message.getJMSCorrelationID();
-// if (txt != null) {
-// builder.startElement("", "CorrelationID", "CorrelationID", null);
-// builder.characters(message.getJMSCorrelationID());
-// builder.endElement();
-// }
-// } catch (JMSException ex) {
-// LOG.error(ex);
-// }
-//
-// try {
-// String txt = message.getJMSType();
-// if (txt != null) {
-// builder.startElement("", "Type", "Type", null);
-// builder.characters(message.getJMSType());
-// builder.endElement();
-// }
-// } catch (JMSException ex) {
-// LOG.error(ex);
-// }
-//
-// // finish root element
-// builder.endElement();
-//
-// // return result
-// return ((DocumentImpl) builder.getDocument()).getNode(nodeNr);
-//
-//
-// }
-}
View
257 extensions/replication/src/org/exist/messaging/JmsMessageSender.java
@@ -1,257 +0,0 @@
-
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.messaging;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Properties;
-import javax.jms.*;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.log4j.Logger;
-import org.exist.dom.NodeProxy;
-import org.exist.memtree.DocumentImpl;
-import org.exist.memtree.MemTreeBuilder;
-import org.exist.memtree.NodeImpl;
-import org.exist.messaging.configuration.JmsMessagingConfiguration;
-import org.exist.messaging.configuration.MessagingMetadata;
-import org.exist.storage.serializers.Serializer;
-import org.exist.validation.internal.node.NodeInputStream;
-import org.exist.xquery.XPathException;
-import org.exist.xquery.XQueryContext;
-import org.exist.xquery.value.*;
-
-/**
- *
- * @author wessels
- */
-public class JmsMessageSender implements MessageSender {
-
- private final static Logger LOG = Logger.getLogger(JmsMessageSender.class);
- private XQueryContext xqcontext;
-
- public JmsMessageSender(XQueryContext context) {
- xqcontext = context.copyContext();
- }
-
- @Override
- public NodeImpl send(JmsMessagingConfiguration config, MessagingMetadata metadata, Item content) throws XPathException {
-
- // JMS specific checks
- config.validateContent();
-
- // Retrieve relevant values
- String initialContextFactory = config.getInitalContextProperty(Context.INITIAL_CONTEXT_FACTORY);
- String providerURL = config.getInitalContextProperty(Context.PROVIDER_URL);
- String connectionFactory = config.getConnectionFactory();
- String destination = config.getDestination();
-
-
- // TODO split up, use more exceptions, add better reporting
- try {
- Properties props = new Properties();
- props.setProperty(Context.INITIAL_CONTEXT_FACTORY, initialContextFactory);
- props.setProperty(Context.PROVIDER_URL, providerURL);
- javax.naming.Context context = new InitialContext(props);
-
- // Setup connection
- ConnectionFactory cf = (ConnectionFactory) context.lookup(connectionFactory);
- Connection connection = cf.createConnection();
-
- // Lookup queue
- Destination dest = (Destination) context.lookup(destination);
-
- // Create session
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Create message producer
- MessageProducer producer = session.createProducer(dest);
-
-
- // Create message
- Message message = createMessage(session, content, metadata, xqcontext);
-
- // Write properties
- Map<String, String> kvs = metadata.getValueMap();
- for (String key : kvs.keySet()) {
- message.setStringProperty(key, kvs.get(key));
- }
-
-
- // Send message
- producer.send(message);
-
- // Close connection
- // TODO keep connection open for re-use, efficiency
- connection.close();
-
- return createReport(message, xqcontext);
-
- } catch (Throwable ex) {
- LOG.error(ex);
- throw new XPathException(ex);
- }
- }
-
-
- private Message createMessage(Session session, Item item, MessagingMetadata mdd, XQueryContext xqcontext) throws JMSException, XPathException {
-
-
- Message message = null;
-
- mdd.add("exist.datatype", Type.getTypeName(item.getType()));
-
- if (item.getType() == Type.ELEMENT || item.getType() == Type.DOCUMENT) {
- LOG.debug("Streaming element or document node");
-
- if (item instanceof NodeProxy) {
- NodeProxy np = (NodeProxy) item;
- String uri = np.getDocument().getBaseURI();
- LOG.debug("Document detected, adding URL " + uri);
- mdd.add("exist.document-uri", uri);
- }
-
- // Node provided
- Serializer serializer = xqcontext.getBroker().newSerializer();
-
- NodeValue node = (NodeValue) item;
- InputStream is = new NodeInputStream(serializer, node);
-
- ByteArrayOutputStream baos=new ByteArrayOutputStream();
- try {
- IOUtils.copy(is, baos);
- } catch (IOException ex) {
- LOG.error(ex);
- throw new XPathException(ex);
- }
- IOUtils.closeQuietly(is);
- IOUtils.closeQuietly(baos);
-
- BytesMessage bytesMessage = session.createBytesMessage();
- bytesMessage.writeBytes(baos.toByteArray());
-
- message=bytesMessage;
-
-
- } else if (item.getType() == Type.BASE64_BINARY || item.getType() == Type.HEX_BINARY) {
- LOG.debug("Streaming base64 binary");
-
- if (item instanceof Base64BinaryDocument) {
- Base64BinaryDocument b64doc = (Base64BinaryDocument) item;
- String uri = b64doc.getUrl();
- LOG.debug("Base64BinaryDocument detected, adding URL " + uri);
- mdd.add("exist.document-uri", uri);
- }
-
- BinaryValue binary = (BinaryValue) item;
-
- ByteArrayOutputStream baos=new ByteArrayOutputStream();
- InputStream is = binary.getInputStream();
-
- //TODO consider using BinaryValue.getInputStream()
- //byte[] data = (byte[]) binary.toJavaObject(byte[].class);
-
- try {
- IOUtils.copy(is, baos);
- } catch (IOException ex) {
- LOG.error(ex);
- throw new XPathException(ex);
- }
- IOUtils.closeQuietly(is);
- IOUtils.closeQuietly(baos);
-
- BytesMessage bytesMessage = session.createBytesMessage();
- bytesMessage.writeBytes(baos.toByteArray());
-
- message=bytesMessage;
- } else if (item.getType() == Type.STRING){
- TextMessage textMessage = session.createTextMessage();
- textMessage.setText(item.getStringValue());
- message=textMessage;
-
- } else {
- ObjectMessage objectMessage = session.createObjectMessage();
- //objectMessage.setObject(item.toJavaObject(Object.class)); TODO hmmmm
- message=objectMessage;
- }
-
- return message;
- }
-
- /**
- * Create messaging results report
- *
- * TODO shared code
- */
- private NodeImpl createReport(Message message, XQueryContext xqcontext) {
-
- MemTreeBuilder builder = xqcontext.getDocumentBuilder();
-
- // start root element
- int nodeNr = builder.startElement("", "JMS", "JMS", null);
-
- try {
- String txt = message.getJMSMessageID();
- if (txt != null) {
- builder.startElement("", "MessageID", "MessageID", null);
- builder.characters(message.getJMSMessageID());
- builder.endElement();
- }
- } catch (JMSException ex) {
- LOG.error(ex);
- }
-
- try {
- String txt = message.getJMSCorrelationID();
- if (txt != null) {
- builder.startElement("", "CorrelationID", "CorrelationID", null);
- builder.characters(message.getJMSCorrelationID());
- builder.endElement();
- }
- } catch (JMSException ex) {
- LOG.error(ex);
- }
-
- try {
- String txt = message.getJMSType();
- if (txt != null) {
- builder.startElement("", "Type", "Type", null);
- builder.characters(message.getJMSType());
- builder.endElement();
- }
- } catch (JMSException ex) {
- LOG.error(ex);
- }
-
- // finish root element
- builder.endElement();
-
- // return result
- return ((DocumentImpl) builder.getDocument()).getNode(nodeNr);
-
-
- }
-}
View
36 extensions/replication/src/org/exist/messaging/MessageReceiver.java
@@ -1,36 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.messaging;
-
-import org.exist.memtree.NodeImpl;
-import org.exist.messaging.configuration.JmsMessagingConfiguration;
-import org.exist.xquery.XPathException;
-import org.exist.xquery.value.FunctionReference;
-
-/**
- *
- * @author Dannes Wessels
- */
-public interface MessageReceiver {
-
- public NodeImpl receive(JmsMessagingConfiguration jmc, FunctionReference ref) throws XPathException ;
-}
View
37 extensions/replication/src/org/exist/messaging/MessageSender.java
@@ -1,37 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.messaging;
-
-import org.exist.memtree.NodeImpl;
-import org.exist.messaging.configuration.JmsMessagingConfiguration;
-import org.exist.messaging.configuration.MessagingMetadata;
-import org.exist.xquery.XPathException;
-import org.exist.xquery.value.Item;
-
-/**
- *
- * @author Dannes Wessels
- */
-public interface MessageSender {
-
- public NodeImpl send(JmsMessagingConfiguration jmc, MessagingMetadata mmd, Item content) throws XPathException ;
-}
View
191 extensions/replication/src/org/exist/messaging/MyListener.java
@@ -1,191 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.messaging;
-
-import java.util.Enumeration;
-import javax.jms.*;
-import org.apache.log4j.Logger;
-import org.exist.memtree.DocumentImpl;
-import org.exist.memtree.MemTreeBuilder;
-import org.exist.memtree.NodeImpl;
-import org.exist.xquery.XQueryContext;
-import org.exist.xquery.value.FunctionReference;
-import org.exist.xquery.value.Item;
-import org.exist.xquery.value.Sequence;
-import org.exist.xquery.value.StringValue;
-
-/**
- * Handle incoming message by executing function with parameters
- * (JMS config, Metadata, payload)
- *
- * @author Dannes Wessels
- */
-public class MyListener implements MessageListener {
-
- private final static Logger LOG = Logger.getLogger(MyListener.class);
- private XQueryContext xqcontext;
- private FunctionReference ref = null;
-
- public MyListener(FunctionReference ref, XQueryContext xqcontext) {
- this.ref = ref;
- this.xqcontext = xqcontext;
- this.ref.setContext(this.xqcontext);
- }
-
- @Override
- public void onMessage(Message message) {
- try {
- LOG.info(message.getStringProperty("name") + " Id=" + message.getJMSMessageID() + " type=" + message.getJMSType());
-
- //NodeImpl report = createReport(message);
- Item content = null;
-
- LOG.info("report created");
-
-
- // Get data from message
- // TODO switch based on supplied content-type e.g. element(),
- // document-node()etc
-
- if (message instanceof TextMessage) {
-
- LOG.info("TextMessage");
-
- String txt = ((TextMessage) message).getText();
-
- LOG.info(txt);
- content = new StringValue(txt);
-
- } else if (message instanceof BytesMessage) {
-
- LOG.info("BytesMessage");
-
- BytesMessage bm = (BytesMessage) message;
-
- LOG.info("length=" + bm.getBodyLength());
-
- byte[] data = new byte[(int) bm.getBodyLength()];
-
- bm.readBytes(data);
-
- String txt = new String(data);
- LOG.info("to be converted '" + txt + "'");
-
- content = new StringValue(txt);
-
- }
-
-
- // Get Meta data from JMS
- // TODO wrap into node structure, flat, or element sequence.
- Enumeration names = message.getPropertyNames();
- for (Enumeration<?> e = names; e.hasMoreElements();) {
- String key = (String) e.nextElement();
- LOG.info(key + " == " + message.getStringProperty(key));
- }
-
- // Call function
-
- // Construct parameters
- Sequence[] params = new Sequence[3];
- params[0] = new StringValue(".....0"); // report; // report
- params[1] = new StringValue(".....1"); //= report; // meta data
- params[2] = new StringValue(".....2"); //= report; // content
-
- // Execute function
- try {
- LOG.info("execute");
-
- /* Sequence ret = */ ref.evalFunction(null, null, params);
-
- // Never reaches here, due to NPE.
- LOG.info("done");
-
- } catch (Throwable e) {
- // Catch all issues.
- LOG.error(e.getMessage(), e);
- e.printStackTrace();
- }
-
-
-
- } catch (JMSException ex) {
- LOG.error(ex);
- ex.printStackTrace();
- }
- }
-
- /**
- * Create messaging results report
- *
- * TODO shared code, except context (new copied)
- */
- private NodeImpl createReport(Message message) {
-
- MemTreeBuilder builder = new MemTreeBuilder();
- builder.startDocument();
-
- // start root element
- int nodeNr = builder.startElement("", "JMS", "JMS", null);
-
- try {
- String txt = message.getJMSMessageID();
- if (txt != null) {
- builder.startElement("", "MessageID", "MessageID", null);
- builder.characters(message.getJMSMessageID());
- builder.endElement();
- }
- } catch (JMSException ex) {
- LOG.error(ex);
- }
-
- try {
- String txt = message.getJMSCorrelationID();
- if (txt != null) {
- builder.startElement("", "CorrelationID", "CorrelationID", null);
- builder.characters(message.getJMSCorrelationID());
- builder.endElement();
- }
- } catch (JMSException ex) {
- LOG.error(ex);
- }
-
- try {
- String txt = message.getJMSType();
- if (txt != null) {
- builder.startElement("", "Type", "Type", null);
- builder.characters(message.getJMSType());
- builder.endElement();
- }
- } catch (JMSException ex) {
- LOG.error(ex);
- }
-
- // finish root element
- builder.endElement();
-
- // return result
- return ((DocumentImpl) builder.getDocument()).getNode(nodeNr);
-
-
- }
-}
View
74 extensions/replication/src/org/exist/messaging/configuration/JmsMessagingConfiguration.java
@@ -1,74 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.messaging.configuration;
-
-import javax.naming.Context;
-import org.exist.xquery.XPathException;
-
-
-
-/**
- *
- * @author wessels
- */
-public class JmsMessagingConfiguration extends MessagingConfiguration {
-
- public String getConnectionFactory() {
- String baseName = getRootName();
- return getRawConfigurationItem(baseName + "." + "ConnectionFactory");
- }
-
- public String getDestination() {
- String baseName = getRootName();
- return getRawConfigurationItem(baseName + "." + "Destination");
- }
-
- public String getInitalContextProperty(String key){
- String baseName = getRootName();
- return getRawConfigurationItem(baseName + ".InitialContext." + key);
- }
-
- @Override
- public void validateContent() throws XPathException {
-
- String initialContextFactory = getInitalContextProperty(Context.INITIAL_CONTEXT_FACTORY);
- if(initialContextFactory==null){
- throw new XPathException("Missing configuration item '" + Context.INITIAL_CONTEXT_FACTORY+"'");
- }
-
- String providerURL = getInitalContextProperty(Context.PROVIDER_URL);
- if(providerURL==null){
- throw new XPathException("Missing configuration item '" + Context.PROVIDER_URL +"'");
- }
-
- String connectionFactory = getConnectionFactory();
- if(connectionFactory==null){
- throw new XPathException("Missing configuration item 'ConnectionFactory'");
- }
-
- String destination = getDestination();
- if(destination==null){
- throw new XPathException("Missing configuration item 'Destination'");
- }
-
- }
-}
View
40 extensions/replication/src/org/exist/messaging/configuration/MessagingConfiguration.java
@@ -1,40 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.messaging.configuration;
-
-import org.exist.xquery.XPathException;
-
-
-
-/**
- *
- * @author wessels
- */
-
-
-public class MessagingConfiguration extends NodeParser {
-
-
- protected void validateContent() throws XPathException {
- throw new XPathException("not implemented");
- }
-}
View
35 extensions/replication/src/org/exist/messaging/configuration/MessagingMetadata.java
@@ -1,35 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.messaging.configuration;
-
-/**
- *
- * @author wessels
- */
-
-
-public class MessagingMetadata extends NodeParser {
-
- public void add(String key, String value){
- getRawValueMap().put(key, value);
- }
-}
View
121 extensions/replication/src/org/exist/messaging/configuration/NodeParser.java
@@ -1,121 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.messaging.configuration;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.exist.xquery.value.NodeValue;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-
-/**
- * Helper class for parsing an exist-db node.<BR>
- *
- * COnverts a structure like
- *
- * <A> <B><C>dd</C></B> <E><F>gg</F></E> </A>
- *
- * into a map like
- *
- * A.B.C = dd A.B.D = gg
- *
- * @author Dannes Wessels (dannes@exist-db.org)
- */
-public abstract class NodeParser {
-
- private Map<String, String> valueMap = new HashMap<String, String>();
- private String rootName;
-
- public String getRawConfigurationItem(String key) {
- return valueMap.get(key);
- }
-
- public String getConfigurationItem(String key) {
- return valueMap.get(rootName + "." + key);
- }
-
- public String getRootName() {
- return rootName;
- }
-
- public void parseDocument(NodeValue configNode) {
- Node doc = configNode.getNode();
- rootName = doc.getLocalName();
- parseNode(doc, rootName);
- }
-
- public Map<String, String> getRawValueMap(){
- return valueMap;
- }
-
- public Map<String, String> getValueMap(){
-
- Map<String, String> retVal = new HashMap<String, String>();
-
- String prefix=rootName + ".";
- int offset = prefix.length();
-
- for(String key: valueMap.keySet()){
-
- String value=valueMap.get(key);
-
- if(key.startsWith(prefix)){
- key = key.substring(offset);
- }
-
- retVal.put(key, value);
- }
-
- return retVal;
- }
-
- /**
- * Iterate over all child elements in node, if no child nodes present, read
- * value of element.
- *
- * @param node Node to be parsed
- * @param path path to current node, like a.b.c
- */
- private void parseNode(Node node, String path) {
-
- NodeList nodeList = node.getChildNodes();
- int length = nodeList.getLength();
-
- if (length > 0) {
-
- for (int i = 0; i < length; i++) {
-
- Node child = nodeList.item(i);
- if (child.getNodeType() == Node.ELEMENT_NODE) {
- parseNode(child, path + "." + child.getLocalName());
-
- } else if (child.getNodeType() == Node.TEXT_NODE) {
- valueMap.put(path, node.getNodeValue());
-
- } else {
- // ignore
- }
- }
-
- }
- }
-}
View
78 extensions/replication/src/org/exist/messaging/xquery/MessagingModule.java
@@ -1,78 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.messaging.xquery;
-
-import java.util.List;
-import java.util.Map;
-import org.exist.dom.QName;
-import org.exist.xquery.AbstractInternalModule;
-import org.exist.xquery.FunctionDef;
-import org.exist.xquery.XPathException;
-
-/**
- *
- * @author wessels
- */
-public class MessagingModule extends AbstractInternalModule {
-
- public final static String NAMESPACE_URI = "http://exist-db.org/xquery/messaging";
- public final static String PREFIX = "messaging";
- public final static String INCLUSION_DATE = "2012-06-01";
- public final static String RELEASED_IN_VERSION = "eXist-2.1";
-
- public final static FunctionDef[] functions = { //new FunctionDef(JFreeCharting.signatures[0], JFreeCharting.class),
- new FunctionDef(SendMessage.signatures[0], SendMessage.class),
- new FunctionDef(ReceiveMessage.signatures[0], ReceiveMessage.class),
- };
-
- public final static QName EXCEPTION_QNAME =
- new QName("exception", MessagingModule.NAMESPACE_URI, MessagingModule.PREFIX);
-
- public final static QName EXCEPTION_MESSAGE_QNAME =
- new QName("exception-message", MessagingModule.NAMESPACE_URI, MessagingModule.PREFIX);
-
- public MessagingModule(Map<String, List<? extends Object>> parameters) throws XPathException {
- super(functions, parameters);
-// declareVariable(EXCEPTION_QNAME, null);
-// declareVariable(EXCEPTION_MESSAGE_QNAME, null);
- }
-
- @Override
- public String getNamespaceURI() {
- return NAMESPACE_URI;
- }
-
- @Override
- public String getDefaultPrefix() {
- return PREFIX;
- }
-
- @Override
- public String getDescription() {
- return "A module for sending messages";
- }
-
- @Override
- public String getReleaseVersion() {
- return RELEASED_IN_VERSION;
- }
-}
View
127 extensions/replication/src/org/exist/messaging/xquery/ReceiveMessage.java
@@ -1,127 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.messaging.xquery;
-
-import org.exist.dom.QName;
-import org.exist.memtree.NodeImpl;
-import org.exist.messaging.JmsMessageReceiver;
-import org.exist.messaging.configuration.JmsMessagingConfiguration;
-import org.exist.xquery.*;
-import org.exist.xquery.value.*;
-
-/**
- *
- * @author wessels
- */
-
-
-public class ReceiveMessage extends BasicFunction {
-
- public final static FunctionSignature signatures[] = {
-
- new FunctionSignature(
- new QName("receive", MessagingModule.NAMESPACE_URI, MessagingModule.PREFIX),
- "Text1",
- new SequenceType[]{
- new FunctionParameterSequenceType("configuration", Type.NODE, Cardinality.EXACTLY_ONE, "text"),
- new FunctionParameterSequenceType("callback function", Type.FUNCTION_REFERENCE, Cardinality.ZERO_OR_ONE, "text"),
-// new FunctionParameterSequenceType("content", Type.ITEM, Cardinality.ZERO_OR_ONE,
-// "Send message to remote server")
- },
- new FunctionReturnSequenceType(Type.NODE, Cardinality.ZERO_OR_ONE, "Confirmation message, if present")
- ),
-
-
- };
-
- public ReceiveMessage(XQueryContext context, FunctionSignature signature) {
- super(context, signature);
- }
-
- @Override
- public Sequence eval(Sequence[] args, Sequence contextSequence) throws XPathException {
-
- /*
- xquery version "1.0";
-
- import module namespace messaging="http://exist-db.org/xquery/messaging"
- at "java:org.exist.messaging.xquery.MessagingModule";
-
-
- declare function local:index-callback($configuration as element(), $properties as element(), $content as item()) {
- util:log("INFO", $content)
- };
-
- let $config :=
- <jms>
- <InitialContext>
- <java.naming.factory.initial>org.apache.activemq.jndi.ActiveMQInitialContextFactory</java.naming.factory.initial>
- <java.naming.provider.url>tcp://localhost:61616</java.naming.provider.url>
- </InitialContext>
- <ConnectionFactory>ConnectionFactory</ConnectionFactory>
- <Destination>dynamicQueues/MyTestQ</Destination>
- </jms>
-
- let $callback := util:function(xs:QName("local:index-callback"), 3)
-
- return
- messaging:receive($config, $callback)
- */
-
-
-
- // Get configuration
- NodeValue configNode = (NodeValue) args[0].itemAt(0);
- JmsMessagingConfiguration jmc = new JmsMessagingConfiguration();
- jmc.parseDocument(configNode);
-
-// // Get additional header
-// NodeValue headersNode = (NodeValue) args[1].itemAt(0);
-// MessagingMetadata mmd = new MessagingMetadata();
-// mmd.parseDocument(headersNode);
-
- // Get function reference
- FunctionReference ref = (FunctionReference) args[1].itemAt(0);
-
-
- // Get content
-// Item content = args[2].itemAt(0);
-
-// if(content instanceof NodeProxy){
-// NodeProxy np = (NodeProxy) content;
-// mmd.add( "url" , np.getDocument().getBaseURI() );
-// }
-//
-//
-// mmd.add("exist.type", Type.getTypeName( content.getType() ));
-
- // Send content
- JmsMessageReceiver sender = new JmsMessageReceiver(context);
-
-
- NodeImpl result = sender.receive(jmc, ref);
-
- return result;
-
- }
-
-}
View
116 extensions/replication/src/org/exist/messaging/xquery/SendMessage.java
@@ -1,116 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.messaging.xquery;
-
-import org.exist.dom.QName;
-import org.exist.memtree.NodeImpl;
-import org.exist.messaging.JmsMessageSender;
-import org.exist.messaging.configuration.JmsMessagingConfiguration;
-import org.exist.messaging.configuration.MessagingMetadata;
-import org.exist.xquery.*;
-import org.exist.xquery.value.*;
-
-/**
- *
- * @author wessels
- */
-
-
-public class SendMessage extends BasicFunction {
-
- public final static FunctionSignature signatures[] = {
-
- new FunctionSignature(
- new QName("send", MessagingModule.NAMESPACE_URI, MessagingModule.PREFIX),
- "Text1",
- new SequenceType[]{
- new FunctionParameterSequenceType("configuration", Type.NODE, Cardinality.EXACTLY_ONE, "text"),
- new FunctionParameterSequenceType("properties", Type.NODE, Cardinality.ZERO_OR_ONE, "text"),
- new FunctionParameterSequenceType("content", Type.ITEM, Cardinality.ZERO_OR_ONE, "Send message to remote server")
- },
- new FunctionReturnSequenceType(Type.NODE, Cardinality.ZERO_OR_ONE, "Confirmation message, if present")
- ),
-
-
- };
-
- public SendMessage(XQueryContext context, FunctionSignature signature) {
- super(context, signature);
- }
-
- @Override
- public Sequence eval(Sequence[] args, Sequence contextSequence) throws XPathException {
-
- /*
- import module namespace messaging="http://exist-db.org/xquery/messaging"
- at "java:org.exist.messaging.xquery.MessagingModule";
-
- let $config :=
- <jms>
- <InitialContext>
- <java.naming.factory.initial>org.apache.activemq.jndi.ActiveMQInitialContextFactory</java.naming.factory.initial>
- <java.naming.provider.url>tcp://myserver.local:61616</java.naming.provider.url>
- </InitialContext>
- <ConnectionFactory>ConnectionFactory</ConnectionFactory>
- <Destination>dynamicQueues/MyTestQ</Destination>
- </jms>
-
- return
- messaging:send($config, <config/>, "My text"),
- messaging:send($config, <config><key1>value1</key1></config>, doc('/db/data.xml'))
- */
-
-
-
- // Get configuration
- NodeValue configNode = (NodeValue) args[0].itemAt(0);
- JmsMessagingConfiguration configuration = new JmsMessagingConfiguration();
- configuration.parseDocument(configNode);
-
- // Get additional header
- NodeValue headersNode = (NodeValue) args[1].itemAt(0);
- MessagingMetadata metaData = new MessagingMetadata();
- metaData.parseDocument(headersNode);
-
- // Get content
- Item content = args[2].itemAt(0);
-
-// if(content instanceof NodeProxy){
-// NodeProxy np = (NodeProxy) content;
-// mmd.add( "url" , np.getDocument().getBaseURI() );
-// }
-//
-//
-// mmd.add("exist.type", Type.getTypeName( content.getType() ));
-
- // Send content
- JmsMessageSender sender = new JmsMessageSender(context);
-
-
-
- NodeImpl result = sender.send(configuration, metaData, content);
-
- return result;
-
- }
-
-}
View
273 extensions/replication/src/org/exist/replication/jms/obsolete/FileSystemListener.java
@@ -1,273 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.replication.jms.obsolete;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Enumeration;
-import java.util.zip.GZIPInputStream;
-import javax.jms.*;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.log4j.Logger;
-import org.exist.replication.shared.eXistMessage;
-
-
-/**
- * Listener for actual handling of JMS message.
- *
- * @author Dannes Wessels
- *
- */
-public class FileSystemListener implements MessageListener {
-
- private static File baseDir;
-
- private eXistMessage convertMessage(BytesMessage bm) {
- eXistMessage em = new eXistMessage();
-
- try {
- Enumeration e = bm.getPropertyNames();
- while(e.hasMoreElements()){
- Object next = e.nextElement();
- if(next instanceof String){
- em.getMetadata().put( (String) next, bm.getObjectProperty( (String) next) );
- }
- }
-
- String value = bm.getStringProperty(eXistMessage.EXIST_RESOURCE_TYPE);
- eXistMessage.ResourceType resourceType = eXistMessage.ResourceType.valueOf(value);
- em.setResourceType(resourceType);
-
- value = bm.getStringProperty(eXistMessage.EXIST_RESOURCE_OPERATION);
- eXistMessage.ResourceOperation changeType = eXistMessage.ResourceOperation.valueOf(value);
- em.setResourceOperation(changeType);
-
- value = bm.getStringProperty(eXistMessage.EXIST_SOURCE_PATH);
- em.setResourcePath(value);
-
- value = bm.getStringProperty(eXistMessage.EXIST_DESTINATION_PATH);
- em.setDestinationPath(value);
-
- long size = bm.getBodyLength();
- LOG.debug("actual length=" + size);
-
- // This is potentially memory intensive
- byte[] payload = new byte[(int) size];
- bm.readBytes(payload);
- em.setPayload(payload);
-
- } catch (JMSException ex) {
- LOG.error(ex);
- }
-
- return em;
-
- }
-
- public FileSystemListener() {
- baseDir = new File("clusteringTest");
- if (!baseDir.exists()) {
- LOG.info("Creating " + baseDir.getAbsolutePath());
- baseDir.mkdirs();
- }
- }
- private final static Logger LOG = Logger.getLogger(FileSystemListener.class);
-
- @Override
- public void onMessage(Message message) {
- try {
- LOG.info("JMSMessageID=" + message.getJMSMessageID());
-
- StringBuilder sb = new StringBuilder();
-
- // Write properties
- Enumeration names = message.getPropertyNames();
- for (Enumeration<?> e = names; e.hasMoreElements();) {
- String key = (String) e.nextElement();
- sb.append("'" + key + "='" + message.getStringProperty(key) + "' ");
- }
- LOG.info(sb.toString());
-
- // Handle message
- if (message instanceof TextMessage) {
- LOG.info(((TextMessage) message).getText());
-
- } else if (message instanceof BytesMessage) {
-
- BytesMessage bm = (BytesMessage) message;
-
- eXistMessage em = convertMessage(bm);
-
-
- switch (em.getResourceType()) {
- case DOCUMENT:
- handleDocument(em);
- break;
- case COLLECTION:
- handleCollection(em);
- break;
- default:
- LOG.error("Unknown resource type");
- break;
- }
-
- }
-
- } catch (JMSException ex) {
- LOG.error(ex);
- }
-
- }
-
- private void handleDocument(eXistMessage em) {
-
- LOG.info(em.getReport());
-
- // Get original path
- String resourcePath = em.getResourcePath();
-
- String[] srcSplitPath = splitPath(resourcePath);
- String srcDir = srcSplitPath[0];
- String srcDoc = srcSplitPath[1];
-
-
- File dir = new File(baseDir, srcDir);
- File file = new File(dir, srcDoc);
-
- switch (em.getResourceOperation()) {
- case CREATE:
- case UPDATE:
- // Create dirs if not existent
-
- dir.mkdirs();
-
- // Create file reference
-
- LOG.info(file.getAbsolutePath());
-
- try {
- // Prepare streams
- FileOutputStream fos = new FileOutputStream(file);
- ByteArrayInputStream bais = new ByteArrayInputStream(em.getPayload());
- GZIPInputStream gis = new GZIPInputStream(bais);
-
- // Copy and unzip
- IOUtils.copy(gis, fos);
-
- // Cleanup
- IOUtils.closeQuietly(fos);
- IOUtils.closeQuietly(gis);
- } catch (IOException ex) {
- LOG.error(ex);
-
- }
- break;
-
- case DELETE:
- FileUtils.deleteQuietly(file);
- break;
-
- case MOVE:
- File mvFile = new File(baseDir, em.getDestinationPath());
- try {
- FileUtils.moveFile(file, mvFile);
- } catch (IOException ex) {
- LOG.error(ex);
- }
- break;
-
- case COPY:
- File cpFile = new File(baseDir, em.getDestinationPath());
- try {
- FileUtils.copyFile(file, cpFile);
- } catch (IOException ex) {
- LOG.error(ex);
- }
- break;
-
- default:
- LOG.error("Unknown change type");
- }
- }
-
- private String[] splitPath(String fullPath) {
- String directory, documentname;
- int separator = fullPath.lastIndexOf("/");
- if (separator == -1) {
- directory = "";
- documentname = fullPath;
- } else {
- directory = fullPath.substring(0, separator);
- documentname = fullPath.substring(separator + 1);
- }
-
- return new String[]{directory, documentname};
- }
-
- private void handleCollection(eXistMessage em) {
-
- File src = new File(baseDir, em.getResourcePath());
-
-
- switch (em.getResourceOperation()) {
- case CREATE:
- case UPDATE:
- try {
- // Create dirs if not existent
- FileUtils.forceMkdir(src);
- } catch (IOException ex) {
- LOG.error(ex);
- }
-
- break;
-
- case DELETE:
- FileUtils.deleteQuietly(src);
- break;
-
- case MOVE:
- File mvDest = new File(baseDir, em.getDestinationPath());
- try {
- FileUtils.moveDirectoryToDirectory(src, mvDest, true);
- } catch (IOException ex) {
- LOG.error(ex);
- }
- break;
-
- case COPY:
-
- File cpDest = new File(baseDir, em.getDestinationPath());
- try {
- FileUtils.copyDirectoryToDirectory(src, cpDest);
- } catch (IOException ex) {
- LOG.error(ex);
- }
- break;
-
- default:
- LOG.error("Unknown change type");
- }
- }
-}
View
83 extensions/replication/src/org/exist/replication/jms/obsolete/ResourceReplicator.java
@@ -1,83 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.replication.jms.obsolete;
-
-import java.util.Properties;
-import javax.jms.*;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.Logger;
-
-/**
- * Helperclass for receiving JMS messages
- *
- * @author Dannes Wessels
- */
-public class ResourceReplicator {
-
- private final static Logger LOG = Logger.getLogger(ResourceReplicator.class);
-
- /**
- * @param args the command line arguments
- */
- public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
-
- BasicConfigurator.resetConfiguration();
- BasicConfigurator.configure();
-
- try {
- Properties props = new Properties();
- props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
- props.setProperty(Context.PROVIDER_URL, "tcp://miniserver.local:61616");
- Context context = new InitialContext(props);
-
- ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("ConnectionFactory");
-
- FileSystemListener myListener = new FileSystemListener();
-
- Destination destination = (Destination) context.lookup("dynamicTopics/eXistdb");
-
- LOG.info("Destination=" + destination);
-
- Connection connection = connectionFactory.createConnection();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer messageConsumer = session.createConsumer(destination);
-
- messageConsumer.setMessageListener(myListener);
-
- connection.start();
-
-
- LOG.info("Receiver is ready");
-
- } catch (Throwable t) {
- LOG.error(t.getMessage(),t);
- }
-
-
-
- }
-}
View
83 extensions/replication/src/org/exist/replication/jms/obsolete/Sender.java
@@ -1,83 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.replication.jms.obsolete;
-
-import java.util.Date;
-import java.util.Properties;
-import javax.jms.*;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.Logger;
-
-/**
- * Helperclass for sending JMS messages
- *
- * @author Dannes Wessels
- */
-public class Sender {
-
- private final static Logger LOG = Logger.getLogger(Sender.class);
-
- /**
- * @param args the command line arguments
- */
- public static void main(String[] args) {
- BasicConfigurator.configure();
-
- try {
-
- Properties props = new Properties();
- props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
- props.setProperty(Context.PROVIDER_URL, "tcp://localhost:61616");
- javax.naming.Context context = new InitialContext(props);
-
-
- ConnectionFactory cf = (ConnectionFactory) context.lookup("ConnectionFactory");
- Connection connection = cf.createConnection();
-
- Destination destination = (Destination) context.lookup("dynamicQueues/eXistdb");
-
-
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer producer = session.createProducer(destination);
- Message message = session.createTextMessage();
- message.setStringProperty("name", "testerdetest " + new Date().toString());
-
-
- producer.send(message);
-
-
-
- connection.close();
-
- LOG.info("sent " + message.getJMSMessageID());
-
- } catch (Throwable t) {
- t.printStackTrace();
- }
-
- System.exit(0);
- }
-}
View
229 extensions/replication/src/org/exist/replication/jms/publish/JMSMessageSender.java
@@ -1,229 +0,0 @@
-/*
- * eXist Open Source Native XML Database
- * Copyright (C) 2012 The eXist Project
- * http://exist-db.org
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * $Id$
- */
-package org.exist.replication.jms.publish;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import javax.jms.*;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import org.apache.log4j.Logger;
-import org.exist.replication.shared.JmsConnectionHelper;
-import org.exist.replication.shared.MessageSender;
-import org.exist.replication.shared.TransportException;
-import org.exist.replication.shared.eXistMessage;
-
-/**
- * Specific class for sending a eXistMessage via JMS to a broker
- *
- * @author Dannes Wessels
- */
-public class JMSMessageSender implements MessageSender {
-
- private final static Logger LOG = Logger.getLogger(JMSMessageSender.class);
- private PublisherParameters parameters = new PublisherParameters();
-
- /**
- * Constructor
- *
- * @param parameters Set of (Key,value) parameters for setting JMS routing
- * instructions, like java.naming.* , destination and connection factory.
- */
- JMSMessageSender(Map<String, List<?>> params) {
- parameters.setMultiValueParameters(params);
- }
-
- /**
- * Helper method to give resources back
- */
- private void closeAll(Context context, Connection connection, Session session) {
-
- boolean doLog = LOG.isDebugEnabled();
-
- if (session != null) {
- if (doLog) {
- LOG.debug("Closing session");
- }
-
- try {
- session.close();
- } catch (JMSException ex) {
- LOG.error(ex);
- }
- }
-
- if (connection != null) {
- if (doLog) {
- LOG.debug("Closing connection");
- }
-
- try {
- connection.close();
- } catch (JMSException ex) {
- LOG.error(ex);
- }
- }
-
- if (context != null) {
- if (doLog) {
- LOG.debug("Closing context");
- }
-
- try {
- context.close();
- } catch (NamingException ex) {
- LOG.error(ex);
- }
- }
- }
-
- /**
- * Send {@link eXistMessage} to message broker.
- *
- * @param em The message that needs to be sent
- * @throws TransportException Thrown when something bad happens.
- */
- public void sendMessage(eXistMessage em) throws TransportException {
-
- // Get from .xconf file, fill defaults when needed
- parameters.processParameters();
-
- Properties contextProps = parameters.getInitialContextProps();
-
- if(LOG.isDebugEnabled()){
- LOG.debug(parameters.getReport());
- }
-
- Context context = null;
- Connection connection = null;
- Session session = null;
-
- try {
- // Setup context
- context = new InitialContext(contextProps);
-
- // Lookup connection factory
- ConnectionFactory cf = (ConnectionFactory) context.lookup(parameters.getConnectionFactory());
-
- // Set specific properties on the connection factory
- JmsConnectionHelper.configureConnectionFactory(cf, parameters);
-
- // Setup connection
- connection = cf.createConnection();
-
- // Set clientId if present
- String clientId = parameters.getClientId();
- if (clientId != null) {
- connection.setClientID(clientId);
- }
-
- // TODO DW: should this be configurable?
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Lookup topic
- Destination destination = (Destination) context.lookup(parameters.getTopic());
- if (!(destination instanceof Topic)) {
- String errorText = String.format("'%s' is not a Topic.", parameters.getTopic());
- LOG.error(errorText);
- throw new TransportException(errorText);
- }
-
- // Create message
- MessageProducer producer = session.createProducer(destination);
-
- // Set time-to-live is set
- Long timeToLive = parameters.getTimeToLive();
- if (timeToLive != null) {
- producer.setTimeToLive(timeToLive);
- }
-
- // Set priority if set
- Integer priority = parameters.getPriority();
- if (priority != null) {
- producer.setPriority(priority);
- }
-
- BytesMessage message = session.createBytesMessage();
-
- // Set payload when available
- byte[] payload = em.getPayload();
- if (payload != null) {
- message.writeBytes(payload); // check empty, collection!
- }
-
- // Set eXist-db clustering specific details
- message.setStringProperty(eXistMessage.EXIST_RESOURCE_OPERATION, em.getResourceOperation().name());
- message.setStringProperty(eXistMessage.EXIST_RESOURCE_TYPE, em.getResourceType().name());
- message.setStringProperty(eXistMessage.EXIST_SOURCE_PATH, em.getResourcePath());
-
- if (em.getDestinationPath() != null) {
- message.setStringProperty(eXistMessage.EXIST_DESTINATION_PATH, em.getDestinationPath());
- }
-
-
- // Set other details
- Map<String, Object> metaData = em.getMetadata();
- for (String item : metaData.keySet()) {
- Object value = metaData.get(item);
-
- if (value instanceof String) {
- message.setStringProperty(item, (String) value);