Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version mgr reuse and efficient event generation #197

Merged
merged 4 commits into from Dec 10, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;

import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.Event;
import javax.jms.JMSException;
import javax.jms.Message;
Expand All @@ -48,7 +47,7 @@ public class DefaultMessageFactory implements JMSEventMessageFactory {
+ "eventType";

@Override
public Message getMessage(final Event jcrEvent, final Session jcrSession,
public Message getMessage(final Event jcrEvent,
final javax.jms.Session jmsSession) throws RepositoryException,
IOException, JMSException {
final Message message = jmsSession.createMessage();
Expand Down
64 changes: 43 additions & 21 deletions fcrepo-jms/src/main/java/org/fcrepo/jms/legacy/LegacyMethod.java
Expand Up @@ -34,15 +34,15 @@
import java.util.List;
import java.util.Properties;

import javax.jcr.Node;
import javax.jcr.RepositoryException;
import javax.jcr.observation.Event;
import javax.jms.JMSException;
import javax.jms.Message;

import org.apache.abdera.model.Category;
import org.apache.abdera.model.Entry;
import org.fcrepo.kernel.utils.FedoraTypesUtils;
import org.fcrepo.jcr.FedoraJcrTypes;
import org.fcrepo.kernel.observer.FedoraEvent;
import org.slf4j.Logger;

/**
Expand All @@ -63,17 +63,16 @@ public class LegacyMethod {

private static final String MODIFY_OBJ_METHOD = "modifyObject";

private static final String PURGE_OBJ_METHOD = "purgeObject";
// pending JCR 2.1, there is no way to detect Obj/DS node types
private static final String PURGE_METHOD = "purge";

private static final String ADD_DS_METHOD = "addDatastream";

private static final String MODIFY_DS_METHOD = "modifyDatastream";

private static final String PURGE_DS_METHOD = "purgeDatastream";

private static final String[] METHODS = new String[] {INGEST_METHOD,
MODIFY_OBJ_METHOD, PURGE_OBJ_METHOD, ADD_DS_METHOD,
MODIFY_DS_METHOD, PURGE_DS_METHOD};
MODIFY_OBJ_METHOD, PURGE_METHOD, ADD_DS_METHOD,
MODIFY_DS_METHOD, };

private static final List<String> METHOD_NAMES = Arrays.asList(METHODS);

Expand Down Expand Up @@ -101,25 +100,27 @@ public class LegacyMethod {
* @param resource
* @throws RepositoryException
*/
public LegacyMethod(final Event jcrEvent, final Node resource)
public LegacyMethod(final Event jcrEvent)
throws RepositoryException {
this(EntryFactory.newEntry());

String wrappedType = (String) jcrEvent.getInfo().get(FedoraEvent.NODE_TYPE_KEY);
final boolean isDatastreamNode =
FedoraTypesUtils.isFedoraDatastream.apply(resource);
FedoraJcrTypes.FEDORA_DATASTREAM.equals(wrappedType);
final boolean isObjectNode =
FedoraTypesUtils.isFedoraObject.apply(resource) &&
!isDatastreamNode;
FedoraJcrTypes.FEDORA_OBJECT.equals(wrappedType);
final boolean isPurge = jcrEvent.getType() == Event.NODE_REMOVED;

if (isDatastreamNode || isObjectNode) {
String resource = getResource(jcrEvent);
if (isDatastreamNode || isObjectNode || isPurge) {
setMethodName(mapMethodName(jcrEvent.getType(), isObjectNode));
final String returnValue = getReturnValue(jcrEvent, resource);
final String returnValue = getReturnValue(jcrEvent);
setContent(getEntryContent(getMethodName(), returnValue));
if (isDatastreamNode) {
setPid(resource.getParent().getName());
setDsId(resource.getName());
setPid(name(parentPath(resource)));
setDsId(name(resource));
} else {
setPid(resource.getName());
setPid(name(resource));
}
} else {
setMethodName(null);
Expand All @@ -128,7 +129,7 @@ public LegacyMethod(final Event jcrEvent, final Node resource)
jcrEvent.getUserID() == null ? "unknown" : jcrEvent.getUserID();
setUserId(userID);
setModified(new Date(jcrEvent.getDate()));
setPath(resource.getPath());
setPath(resource);
}

/**
Expand Down Expand Up @@ -359,11 +360,10 @@ protected static String objectToString(final String obj,
return term;
}

protected static String getReturnValue(final Event jcrEvent,
final Node jcrNode) throws RepositoryException {
protected static String getReturnValue(final Event jcrEvent) throws RepositoryException {
switch (jcrEvent.getType()) {
case NODE_ADDED:
return jcrNode.getName();
return name(jcrEvent.getPath());
case NODE_REMOVED:
case PROPERTY_ADDED:
case PROPERTY_CHANGED:
Expand All @@ -380,12 +380,34 @@ protected static String mapMethodName(final int eventType,
case NODE_ADDED:
return isObjectNode ? INGEST_METHOD : ADD_DS_METHOD;
case NODE_REMOVED:
return isObjectNode ? PURGE_OBJ_METHOD : PURGE_DS_METHOD;
return PURGE_METHOD;
default :
return isObjectNode ? MODIFY_OBJ_METHOD : MODIFY_DS_METHOD;
}
}

protected static String name(String path) {
return path.substring(path.lastIndexOf('/') + 1);
}

protected static String getResource(final Event jcrEvent) throws RepositoryException {
switch (jcrEvent.getType()) {
case NODE_ADDED:
case NODE_REMOVED:
return jcrEvent.getPath();
case PROPERTY_ADDED:
case PROPERTY_CHANGED:
case PROPERTY_REMOVED:
return parentPath(jcrEvent.getPath());
default:
return null;
}
}

protected static String parentPath(String path) {
return path.substring(0,path.lastIndexOf('/'));
}

/**
* TODO
*
Expand Down
Expand Up @@ -21,8 +21,6 @@
import java.io.IOException;
import java.io.StringWriter;

import javax.jcr.Item;
import javax.jcr.Node;
import javax.jcr.RepositoryException;
import javax.jcr.observation.Event;
import javax.jms.JMSException;
Expand All @@ -47,16 +45,10 @@ public LegacyMethodEventFactory() {

@Override
public Message getMessage(final Event jcrEvent,
final javax.jcr.Session jcrSession,
final javax.jms.Session jmsSession) throws RepositoryException,
IOException, JMSException {
LOGGER.trace("Received an event to transform.");
final String path = jcrEvent.getPath();
LOGGER.trace("Retrieved path from event.");
final Item item = jcrSession.getItem(path);
final Node resource = item.isNode() ? (Node)item : item.getParent();
LOGGER.trace("Retrieved node from event.");
final LegacyMethod legacy = new LegacyMethod(jcrEvent, resource);
final LegacyMethod legacy = new LegacyMethod(jcrEvent);
final StringWriter writer = new StringWriter();
legacy.writeTo(writer);
final String atomMessage = writer.toString();
Expand Down
Expand Up @@ -41,7 +41,6 @@ public interface JMSEventMessageFactory {
* @throws JMSException
*/
Message getMessage(final Event jcrEvent,
final javax.jcr.Session jcrSession,
final javax.jms.Session jmsSession) throws RepositoryException,
IOException, JMSException;
}
Expand Up @@ -23,7 +23,6 @@
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.observation.Event;
import javax.jms.Connection;
Expand All @@ -47,9 +46,6 @@ public class JMSTopicPublisher {
@Inject
private EventBus eventBus;

@Inject
private Repository repo;

@Inject
private ActiveMQConnectionFactory connectionFactory;

Expand All @@ -64,8 +60,6 @@ public class JMSTopicPublisher {

private final Logger LOGGER = getLogger(JMSTopicPublisher.class);

private javax.jcr.Session session;

/**
* When an EventBus mesage is received, map it to our JMS
* message payload and push it onto the queue.
Expand All @@ -80,7 +74,7 @@ public void publishJCREvent(final Event fedoraEvent) throws JMSException,
RepositoryException, IOException {
LOGGER.debug("Received an event from the internal bus.");
final Message tm =
eventFactory.getMessage(fedoraEvent, session, jmsSession);
eventFactory.getMessage(fedoraEvent, jmsSession);
LOGGER.debug("Transformed the event to a JMS message.");
producer.send(tm);

Expand All @@ -102,8 +96,6 @@ public void acquireConnections() throws JMSException, RepositoryException {
jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = jmsSession.createProducer(jmsSession.createTopic("fedora"));
eventBus.register(this);

session = repo.login();
}

/**
Expand All @@ -119,6 +111,5 @@ public void releaseConnections() throws JMSException {
jmsSession.close();
connection.close();
eventBus.unregister(this);
session.logout();
}
}
@@ -1,5 +1,7 @@
ingest.datatype = xsd:string
ingest.parameter = objectPID
purge.datatype = xsd:string
purge.parameter = purgedDate
purgeObject.datatype = xsd:string
purgeObject.parameter = purgedDate
modifyObject.datatype = xsd:string
Expand Down
Expand Up @@ -90,6 +90,9 @@ public void acquireConnection() throws JMSException {

@After
public void releaseConnection() throws JMSException {
// ignore any remaining or queued messages
consumer.setMessageListener(new NoopListener());
// and shut the listening machinery down
logger.debug(this.getClass().getName() + " releasing JMS connection.");
consumer.close();
session.close();
Expand Down Expand Up @@ -131,8 +134,9 @@ public void testAtomStream() throws RepositoryException,
public void testAtomStreamNodePath() throws RepositoryException,
InterruptedException {
final int minEntriesSize = 2;
final Session session = repository.login();
session.getRootNode().addNode("test1/sigma").addMixin(FEDORA_OBJECT);
Session session = repository.login();
final String testPath = "/test1/sigma";
session.getRootNode().addNode(testPath.substring(1)).addMixin(FEDORA_OBJECT);
session.save();

waitForEntry(minEntriesSize);
Expand All @@ -143,6 +147,7 @@ public void testAtomStreamNodePath() throws RepositoryException,
}

String path = null;
String title = null;
assertEquals("Entries size not 2", entries.size(), 2);
for (final Entry entry : entries) {
final List<Category> categories = copyOf(entry.getCategories("xsd:string"));
Expand All @@ -153,11 +158,43 @@ public void testAtomStreamNodePath() throws RepositoryException,
p = cat.getTerm();
}
}
if (p.equals("/test1/sigma")) {
if (testPath.equals(p)) {
path = p;
title = entry.getTitle();
}
}
assertEquals("Got wrong path!", "/test1/sigma", path);
assertEquals("Got wrong path!", testPath, path);
assertEquals("Got wrong title/method!", "ingest", title);
entries.clear();
path = null;
title = null;
session = repository.login();
session.removeItem(testPath);
session.save();
waitForEntry(2);
session.logout();
if (entries.isEmpty()) {
fail("Waited a second, got no messages");
}

// wait for both the parent update and the removal message
assertEquals("Entries size not 2", entries.size(), 2);
for (final Entry entry : entries) {
final List<Category> categories = copyOf(entry.getCategories("xsd:string"));
String p = null;
for (final Category cat : categories) {
if (cat.getLabel().equals("path")) {
logger.debug("Found Category with term: " + cat.getTerm());
p = cat.getTerm();
}
}
if (p.equals(testPath)) {
path = p;
title = entry.getTitle();
}
}
assertEquals("Got wrong path!", testPath, path);
assertEquals("Got wrong title/method!", "purge", title);
}

@Test
Expand Down Expand Up @@ -226,10 +263,10 @@ public void testDatastreamTerm() throws RepositoryException,

@Override
public void onMessage(final Message message) {
logger.debug("Received JMS message: " + message.toString());

final TextMessage tMessage = (TextMessage) message;
try {
logger.debug("Received JMS message: " + tMessage.getText());
if (LegacyMethod.canParse(message)) {
final LegacyMethod legacy = new LegacyMethod(tMessage.getText());
final Entry entry = legacy.getEntry();
Expand Down Expand Up @@ -258,5 +295,4 @@ private void waitForEntry(final int size) throws InterruptedException {
}
}
}

}
Expand Up @@ -191,6 +191,9 @@ public void acquireConnection() throws JMSException {

@After
public void releaseConnection() throws JMSException {
// ignore any remaining or queued messages
consumer.setMessageListener(new NoopListener());
// and shut the listening machinery down
LOGGER.debug(this.getClass().getName() + " releasing JMS connection.");
consumer.close();
session.close();
Expand Down
@@ -0,0 +1,28 @@
/**
* Copyright 2013 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fcrepo.integration.jms.observer;

import javax.jms.Message;
import javax.jms.MessageListener;

class NoopListener implements MessageListener {

@Override
public void onMessage(Message arg0) {

}

}
Expand Up @@ -69,7 +69,7 @@ public void testBuildMessage() throws RepositoryException, IOException,
REPOSITORY_NAMESPACE + EventType.valueOf(NODE_ADDED).toString();
when(mockEvent.getType()).thenReturn(testType);
final Message testMessage =
testDefaultMessageFactory.getMessage(mockEvent, null, mockSession);
testDefaultMessageFactory.getMessage(mockEvent, mockSession);
assertEquals("Got wrong date in message!", testDate, (Long) testMessage
.getLongProperty(TIMESTAMP_HEADER_NAME));
assertEquals("Got wrong identifier in message!", testPath, testMessage
Expand Down