Permalink
Browse files

Indexing workflow complete, testing blocked on https://www.pivotaltra…

  • Loading branch information...
1 parent 05ad676 commit f2f17665b91dff600cad9a221c7a5a8727e4bc09 ajs6f committed Dec 8, 2013
@@ -109,11 +109,23 @@
<groupId>org.apache.jena</groupId>
<artifactId>jena-fuseki</artifactId>
<scope>test</scope>
+<!-- <exclusions>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ </exclusions> -->
</dependency>
<!-- Start of Solr Indexer libs -->
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- <dependency>
<groupId>org.apache.solr</groupId>
@@ -18,6 +18,8 @@
*/
package org.fcrepo.indexer;
+import static org.slf4j.LoggerFactory.getLogger;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -26,6 +28,7 @@
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
+import org.slf4j.Logger;
/**
@@ -40,20 +43,26 @@
private byte[] cache;
+ private static final Logger LOGGER = getLogger(CachingRetriever.class);
+
+
/* (non-Javadoc)
* @see java.util.concurrent.Callable#call()
*/
@Override
public InputStream call() throws ClientProtocolException, IOException,
AbsentTransformPropertyException, HttpException {
if (cached) {
+ LOGGER.debug("Returning cached content...");
return new ByteArrayInputStream(cache);
}
+ LOGGER.debug("Retrieving uncached content...");
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
retrieveHttpResponse().getEntity().writeTo(out);
cache = out.toByteArray();
}
cached = true;
+ LOGGER.debug("Retrieved cache-able content:\n{}", new String(cache));
return new ByteArrayInputStream(cache);
}
@@ -17,7 +17,11 @@
package org.fcrepo.indexer;
import static com.google.common.base.Throwables.propagate;
+import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
import static com.hp.hpl.jena.rdf.model.ResourceFactory.createProperty;
+import static com.hp.hpl.jena.rdf.model.ResourceFactory.createResource;
+import static com.hp.hpl.jena.vocabulary.RDF.type;
+import static java.lang.Integer.MAX_VALUE;
import static javax.jcr.observation.Event.NODE_REMOVED;
import static org.fcrepo.kernel.RdfLexicon.REPOSITORY_NAMESPACE;
import static org.slf4j.LoggerFactory.getLogger;
@@ -36,18 +40,19 @@
import org.apache.http.HttpException;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.fcrepo.kernel.utils.EventType;
import org.slf4j.Logger;
+import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.Property;
-
+import com.hp.hpl.jena.rdf.model.Resource;
/**
* MessageListener implementation that retrieves objects from the repository and
- * invokes one or more indexers to index the content.
- *
- * documentation:
+ * invokes one or more indexers to index the content. documentation:
* https://wiki.duraspace.org/display/FF/Design+-+Messaging+for+Workflow
+ *
* @author Esmé Cowles
* @author ajs6f
* @date Aug 19 2013
@@ -62,7 +67,6 @@
private final HttpClient httpClient;
-
/**
* Identifier message header
*/
@@ -81,11 +85,21 @@
private static final String REMOVAL_EVENT_TYPE = REPOSITORY_NAMESPACE
+ EventType.valueOf(NODE_REMOVED).toString();
+ public static final String INDEXER_NAMESPACE =
+ "http://fedora.info/definitions/v4/indexing#";
+
/**
- * Indicates the transformation to use with this resource to derive indexing information.
+ * Indicates the transformation to use with this resource to derive indexing
+ * information.
*/
public static final Property INDEXING_TRANSFORM_PREDICATE =
- createProperty(REPOSITORY_NAMESPACE + "hasIndexingTransformation");
+ createProperty(INDEXER_NAMESPACE + "hasIndexingTransformation");
+
+ /**
+ * Indicates that a resource is indexable.
+ */
+ public static final Resource INDEXABLE_MIXIN =
+ createResource(INDEXER_NAMESPACE + "indexable");
private static final Reader EMPTY_CONTENT = null;
@@ -94,7 +108,11 @@
**/
public IndexerGroup() {
LOGGER.debug("Creating IndexerGroup: {}", this);
- this.httpClient = new DefaultHttpClient();
+ final PoolingClientConnectionManager connMann =
+ new PoolingClientConnectionManager();
+ connMann.setMaxTotal(MAX_VALUE);
+ connMann.setDefaultMaxPerRoute(MAX_VALUE);
+ this.httpClient = new DefaultHttpClient(connMann);
}
/**
@@ -139,75 +157,106 @@ public void onMessage(final Message message) {
}
try {
// get pid and eventType from message
- final String pid = message.getStringProperty(IDENTIFIER_HEADER_NAME);
- final String eventType = message.getStringProperty(EVENT_TYPE_HEADER_NAME);
+ final String pid =
+ message.getStringProperty(IDENTIFIER_HEADER_NAME);
+ final String eventType =
+ message.getStringProperty(EVENT_TYPE_HEADER_NAME);
LOGGER.debug("Discovered pid: {} in message.", pid);
LOGGER.debug("Discovered event type: {} in message.", eventType);
final Boolean removal = REMOVAL_EVENT_TYPE.equals(eventType);
LOGGER.debug("It is {} that this is a removal operation.", removal);
-
- final RdfRetriever rdfr =
- new RdfRetriever(getRepositoryURL() + pid, httpClient);
+ final String uri = getRepositoryURL() + pid;
+ final RdfRetriever rdfr = new RdfRetriever(uri, httpClient);
final NamedFieldsRetriever nfr =
- new NamedFieldsRetriever(getRepositoryURL() + pid, httpClient,
- rdfr);
-
- for (final Indexer indexer : indexers) {
- LOGGER.debug("Operating for indexer: {}", indexer);
- Boolean hasContent = false;
- Reader content = EMPTY_CONTENT;
- if (!removal) {
- switch (indexer.getIndexerType()) {
- case NAMEDFIELDS:
- try (final InputStream result = nfr.call()) {
- content = new InputStreamReader(result);
- hasContent = true;
- } catch (final IOException | HttpException e) {
- LOGGER.error(
- "Could not retrieve content for update!",
- e);
- hasContent = false;
- } catch (final AbsentTransformPropertyException e) {
- hasContent = false;
- }
- case RDF:
- try (final InputStream result = rdfr.call()) {
- content = new InputStreamReader(result);
+ new NamedFieldsRetriever(uri, httpClient, rdfr);
+ final Model rdf =
+ createDefaultModel().read(rdfr.call(), null, "N3");
+
+ if (rdf.contains(createResource(uri), type, INDEXABLE_MIXIN)) {
+ LOGGER.debug("Discovered indexable type on this resource.");
+ for (final Indexer indexer : getIndexers()) {
+ LOGGER.debug("Operating for indexer: {}", indexer);
+ Boolean hasContent = false;
+ Reader content = EMPTY_CONTENT;
+ if (!removal) {
+ switch (indexer.getIndexerType()) {
+ case NAMEDFIELDS:
+ LOGGER.debug(
+ "Retrieving named fields for: {}, (may be cached) to index to {}...",
+ pid, indexer);
+ try (final InputStream result = nfr.call()) {
+ content = new InputStreamReader(result);
+ hasContent = true;
+ } catch (final IOException | HttpException e) {
+ LOGGER.error(
+ "Could not retrieve content for update of: {} to indexer {}!",
+ pid, indexer);
+ LOGGER.error("with exception:", e);
+ hasContent = false;
+ } catch (final AbsentTransformPropertyException e) {
+ hasContent = false;
+ }
+ break;
+ case RDF:
+ LOGGER.debug(
+ "Retrieving RDF for: {}, (may be cached) to index to {}...",
+ pid, indexer);
+ try (final InputStream result = rdfr.call()) {
+ content = new InputStreamReader(result);
+ hasContent = true;
+ } catch (IOException | HttpException e) {
+ LOGGER.error(
+ "Could not retrieve content for update of: {} to indexer {}!",
+ pid, indexer);
+ LOGGER.error("with exception:", e);
+ hasContent = false;
+ } catch (final AbsentTransformPropertyException e1) {
+ hasContent = false;
+ }
+ break;
+ default:
+ content =
+ new StringReader(
+ "Default content for update: "
+ + pid);
hasContent = true;
- } catch (IOException | HttpException e) {
- LOGGER.error(
- "Could not retrieve content for update!",
- e);
- hasContent = false;
- } catch (final AbsentTransformPropertyException e1) {
- hasContent = false;
- }
- default:
- content = new StringReader(pid);
- hasContent = true;
+ break;
+ }
}
- }
- try {
- if (removal) {
- indexer.remove(pid);
- } else {
- if (hasContent) {
- indexer.update(pid, content);
+ try {
+ if (removal) {
+ LOGGER.debug(
+ "Executing removal of: {} to indexer: {}...",
+ pid, indexer);
+ indexer.remove(pid);
} else {
- LOGGER.error(
- "Received update for {} but was unable to retrieve content for update!",
- pid);
+ if (hasContent) {
+ LOGGER.debug(
+ "Executing update of: {} to indexer: {}...",
+ pid, indexer);
+ indexer.update(pid, content);
+ } else {
+ LOGGER.error(
+ "Received update for: {} but was unable to retrieve "
+ + "content for update to indexer: {}!",
+ pid, indexer);
+ }
}
+ } catch (final Exception e) {
+ LOGGER.error("Error indexing {}: {}!", pid, e);
}
- } catch (final Exception e) {
- LOGGER.error("Error indexing {}: {}!", pid, e);
}
+ } else {
+ LOGGER.info("Resource retrieved without indexable type. Will not index.");
}
- } catch (final JMSException e) {
+ } catch (final JMSException | IOException | HttpException e) {
LOGGER.error("Error processing JMS event!", e);
+ } catch (final AbsentTransformPropertyException e2) {
+ // cannot be thrown here: simply an artifact of Java's crappy type
+ // system
}
}
@@ -65,8 +65,11 @@ public NamedFieldsRetriever(final String uri, final HttpClient client,
public HttpResponse retrieveHttpResponse() throws AbsentTransformPropertyException,
ClientProtocolException, IOException, HttpException {
LOGGER.debug("Retrieving RDF representation from: {}", uri);
- final Model rdf = createDefaultModel().read(rdfr.call(), null);
+ final Model rdf = createDefaultModel().read(rdfr.call(), null, "N3");
if (!rdf.contains(createResource(uri), INDEXING_TRANSFORM_PREDICATE)) {
+ LOGGER.info(
+ "Found no property locating LDPath transform for: {}, will not retrieve transformed content.",
+ uri);
throw new AbsentTransformPropertyException(uri);
}
final RDFNode indexingTransform =
@@ -17,6 +17,7 @@
package org.fcrepo.indexer;
import static org.apache.http.HttpStatus.SC_OK;
+import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
@@ -26,6 +27,8 @@
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.jena.riot.WebContent;
+import org.slf4j.Logger;
/**
* Retrieves RDF representations of resources for storage in a triplestore.
@@ -37,12 +40,15 @@
*/
public class RdfRetriever extends CachingRetriever {
- private static final String RDF_SERIALIZATION = "application/rdf+xml";
+ private static final String RDF_SERIALIZATION = WebContent.contentTypeN3;
private final String identifier;
private final HttpClient httpClient;
+ private static final Logger LOGGER = getLogger(RdfRetriever.class);
+
+
/**
* @param identifier
* @param client
@@ -57,11 +63,12 @@ public HttpResponse retrieveHttpResponse() throws ClientProtocolException, IOExc
HttpException {
final HttpUriRequest request = new HttpGet(identifier);
request.addHeader("Accept", RDF_SERIALIZATION);
+ LOGGER.debug("Retrieving RDF content from: {}...", request.getURI());
final HttpResponse response = httpClient.execute(request);
if (response.getStatusLine().getStatusCode() == SC_OK) {
return response;
} else {
- throw new HttpException(response.getStatusLine().getReasonPhrase());
+ throw new HttpException(response.getStatusLine().toString());
}
}
@@ -50,6 +50,9 @@
*/
public class SolrIndexer implements Indexer {
+ public static final String CONFIGURATION_FOLDER =
+ "fedora:system/fedora:transform/fedora:ldpath/";
+
private final SolrServer server;
private static final Logger LOGGER = getLogger(SolrIndexer.class);
@@ -138,9 +141,11 @@ public UpdateResponse call() throws Exception {
}));
}
- private static <T> ListenableFuture<T> run(
- final ListenableFutureTask<T> task) {
- task.run();
+ private <T> ListenableFuture<T> run(final ListenableFutureTask<T> task) {
+ synchronized (this) {
+ task.run();
+ notifyAll();
+ }
return task;
}
Oops, something went wrong.

0 comments on commit f2f1766

Please sign in to comment.