Skip to content

Commit

Permalink
Merge pull request #11 from acoburn/triplestore_processors
Browse files Browse the repository at this point in the history
Triplestore processors
  • Loading branch information
acoburn committed Nov 12, 2014
2 parents 0b60f20 + a159385 commit 1c22f5f
Show file tree
Hide file tree
Showing 10 changed files with 747 additions and 15 deletions.
42 changes: 28 additions & 14 deletions pom.xml
Expand Up @@ -22,6 +22,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<fcrepo.version>4.0.0-beta-05-SNAPSHOT</fcrepo.version>
<camel.version>2.14.0</camel.version>
<commons.lang3.version>3.3.2</commons.lang3.version>
<httpclient.version>4.3.5</httpclient.version>
<jena.fuseki.version>1.1.0</jena.fuseki.version>
<jena.version>2.12.1</jena.version>
Expand Down Expand Up @@ -136,6 +137,19 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons.lang3.version}</version>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-common</artifactId>
<version>${jersey.version}</version>
</dependency>

<dependency>
Expand All @@ -153,6 +167,18 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.jena</groupId>
<artifactId>jena-arq</artifactId>
<version>${jena.version}</version>
</dependency>

<dependency>
<groupId>org.apache.jena</groupId>
<artifactId>jena-core</artifactId>
<version>${jena.version}</version>
</dependency>

<dependency>
<groupId>javax.ws.rs</groupId>
Expand Down Expand Up @@ -212,20 +238,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.jena</groupId>
<artifactId>jena-arq</artifactId>
<version>${jena.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.jena</groupId>
<artifactId>jena-core</artifactId>
<version>${jena.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>javax.jcr</groupId>
<artifactId>jcr</artifactId>
Expand Down Expand Up @@ -259,6 +271,8 @@
<includes>
<include>**/src/main/java/**</include>
<include>**/src/test/java/**</include>
<include>**/examples/fcrepo-camel-solr/src/main/java/**</include>
<include>**/examples/fcrepo-camel-solr-scala/src/main/scala/**</include>
</includes>
<excludes>
<exclude>target/**</exclude>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/fcrepo/camel/FedoraEndpoint.java
Expand Up @@ -34,7 +34,7 @@
@UriEndpoint(scheme = "fcrepo")
public class FedoraEndpoint extends DefaultEndpoint {

public static final String FCREPO_BASEURL = "FCREPO_BASEURL";
public static final String FCREPO_BASE_URL = "FCREPO_BASE_URL";

public static final String FCREPO_IDENTIFIER = "FCREPO_IDENTIFIER";

Expand Down
116 changes: 116 additions & 0 deletions src/main/java/org/fcrepo/camel/processor/SparqlDeleteProcessor.java
@@ -0,0 +1,116 @@
/**
* Copyright 2014 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fcrepo.camel.processor;

import static org.apache.camel.Exchange.HTTP_METHOD;
import static org.apache.camel.Exchange.CONTENT_TYPE;
import static org.fcrepo.camel.FedoraEndpoint.FCREPO_BASE_URL;
import static org.fcrepo.camel.FedoraEndpoint.FCREPO_IDENTIFIER;
import static org.fcrepo.jms.headers.DefaultMessageFactory.BASE_URL_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.IDENTIFIER_HEADER_NAME;
import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;

import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.commons.lang3.StringUtils;

import com.hp.hpl.jena.graph.Node_URI;
import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.StmtIterator;

import java.io.InputStream;
import java.util.Set;
import java.util.HashSet;
import java.util.List;
import java.util.ArrayList;

/**
* Represends a message processor that deletes objects from an
* external triplestore.
*
* @author Aaron Coburn
* @since Nov 8, 2014
*/
public class SparqlDeleteProcessor implements Processor {
/**
* Define how the message should be processed.
*/
public void process(final Exchange exchange) throws Exception {

final Message in = exchange.getIn();
final Model model = createDefaultModel().read(in.getBody(InputStream.class), null);
final StmtIterator triples = model.listStatements();
String subject = null;

if (in.getHeader(FCREPO_BASE_URL) != null) {
subject = in.getHeader(FCREPO_BASE_URL, String.class);
} else if (in.getHeader(BASE_URL_HEADER_NAME) != null) {
subject = in.getHeader(BASE_URL_HEADER_NAME, String.class);
} else {
throw new Exception("No baseURL header available!");
}

if (in.getHeader(FCREPO_IDENTIFIER) != null) {
subject += in.getHeader(FCREPO_IDENTIFIER);
} else if (in.getHeader(IDENTIFIER_HEADER_NAME) != null) {
subject += in.getHeader(IDENTIFIER_HEADER_NAME);
}

// build list of triples to delete
final Set<String> uris = new HashSet<String>();
while ( triples.hasNext() ) {
final Triple triple = triples.next().asTriple();

// add subject uri, if it is part of this object
if ( triple.getSubject().isURI() ) {
final String uri = ((Node_URI)triple.getSubject()).getURI();

if (uriMatches(subject, uri) ) {
uris.add(uri);
}
}

// add object uri, if it is part of this object
if ( triple.getObject().isURI() ) {
final String uri = ((Node_URI)triple.getObject()).getURI();
if (uriMatches(subject, uri) ) {
uris.add(uri);
}
}
}

// build delete commands
final List<String> commands = new ArrayList<String>();
for (final String uri : uris) {
commands.add("DELETE WHERE { <" + uri + "> ?p ?o }");
}

exchange.getIn().setBody(StringUtils.join(commands, ";\n"));
exchange.getIn().setHeader(HTTP_METHOD, "POST");
exchange.getIn().setHeader(CONTENT_TYPE, "application/sparql-update");
}

private static boolean uriMatches(final String resource, final String candidate) {
// All triples that will match this logic are ones that:
// - have a candidate subject or object that equals the target resource of removal, or
// - have a candidate subject or object that is prefixed with the resource of removal
// (therefore catching all children).
return resource.equals(candidate) || candidate.startsWith(resource + "/")
|| candidate.startsWith(resource + "#");
}
}
@@ -0,0 +1,73 @@
/**
* Copyright 2014 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fcrepo.camel.processor;

import static org.apache.camel.Exchange.HTTP_METHOD;
import static org.apache.camel.Exchange.CONTENT_TYPE;
import static org.apache.camel.Exchange.ACCEPT_CONTENT_TYPE;
import static org.fcrepo.camel.FedoraEndpoint.FCREPO_BASE_URL;
import static org.fcrepo.camel.FedoraEndpoint.FCREPO_IDENTIFIER;
import static org.fcrepo.jms.headers.DefaultMessageFactory.BASE_URL_HEADER_NAME;
import static org.fcrepo.jms.headers.DefaultMessageFactory.IDENTIFIER_HEADER_NAME;

import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;

/**
* Represents a Processor class that formulates a Sparql DESCRIBE query
* that is ready to be POSTed to a Sparql endpoint.
*
* The processor expects the following headers:
* org.fcrepo.jms.identifier
* org.fcrepo.jms.baseURL
* each of which can be overridden with the following:
* FCREPO_IDENTIFIER
* FCREPO_BASE_URL
*
* @author Aaron Coburn
* @since November 6, 2014
*/
public class SparqlDescribeProcessor implements Processor {
/**
* Define how this message should be processed
*/
public void process(final Exchange exchange) throws Exception {

final Message in = exchange.getIn();

String subject = null;

if (in.getHeader(FCREPO_BASE_URL) != null) {
subject = in.getHeader(FCREPO_BASE_URL, String.class);
} else if (in.getHeader(BASE_URL_HEADER_NAME) != null) {
subject = in.getHeader(BASE_URL_HEADER_NAME, String.class);
} else {
throw new Exception("No baseURL header available!");
}

if (in.getHeader(FCREPO_IDENTIFIER) != null) {
subject += in.getHeader(FCREPO_IDENTIFIER);
} else if (in.getHeader(IDENTIFIER_HEADER_NAME) != null) {
subject += in.getHeader(IDENTIFIER_HEADER_NAME);
}

exchange.getIn().setBody("query=DESCRIBE <" + subject + ">");
exchange.getIn().setHeader(HTTP_METHOD, "POST");
exchange.getIn().setHeader(ACCEPT_CONTENT_TYPE, "application/rdf+xml");
exchange.getIn().setHeader(CONTENT_TYPE, "application/x-www-form-urlencoded");
}
}
@@ -0,0 +1,60 @@
/**
* Copyright 2014 DuraSpace, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.fcrepo.camel.processor;

import static org.apache.camel.Exchange.HTTP_METHOD;
import static org.apache.camel.Exchange.CONTENT_TYPE;
import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;

import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;

import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.StmtIterator;
import com.hp.hpl.jena.sparql.modify.request.QuadDataAcc;
import com.hp.hpl.jena.sparql.modify.request.UpdateDataInsert;
import com.hp.hpl.jena.update.UpdateRequest;

import java.io.InputStream;

/**
* Represents a processor for creating the sparql-update message to
* be passed to an external triplestore.
*
* @author Aaron Coburn
* @since Nov 8, 2014
*/
public class SparqlInsertProcessor implements Processor {
/**
* Define how the message is processed.
*/
public void process(final Exchange exchange) throws Exception {

final Message in = exchange.getIn();
final Model model = createDefaultModel().read(in.getBody(InputStream.class), null, "N-TRIPLE");
final StmtIterator triples = model.listStatements();
final QuadDataAcc add = new QuadDataAcc();
while (triples.hasNext()) {
add.addTriple(triples.nextStatement().asTriple());
}
final UpdateRequest request = new UpdateRequest(new UpdateDataInsert(add));

exchange.getIn().setBody(request.toString());
exchange.getIn().setHeader(HTTP_METHOD, "POST");
exchange.getIn().setHeader(CONTENT_TYPE, "application/sparql-update");
}
}

0 comments on commit 1c22f5f

Please sign in to comment.