Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Iterator-based RdfStream with Java 8 Streams #962

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,6 +27,7 @@
import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_NON_RDF_SOURCE_DESCRIPTION;
import static org.fcrepo.kernel.api.FedoraTypes.FEDORA_CONTAINER;
import static org.fcrepo.kernel.api.RdfLexicon.HAS_MESSAGE_DIGEST;
import static org.fcrepo.kernel.api.RdfCollectors.toModel;
import static org.fcrepo.kernel.api.utils.ContentDigest.asURI;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -300,7 +301,7 @@ private String checkFixity(final FedoraBinary binary)
final URI calculatedChecksum = asURI(SHA_1.toString(), hash);

final DefaultIdentifierTranslator graphSubjects = new DefaultIdentifierTranslator(repo.login());
final Model results = binary.getFixity(graphSubjects).asModel();
final Model results = binary.getFixity(graphSubjects).collect(toModel());
assertNotNull(results);

assertFalse("Found no results!", results.isEmpty());
Expand Down
Expand Up @@ -17,17 +17,18 @@

import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
import org.fcrepo.kernel.api.models.FedoraResource;
import org.fcrepo.kernel.api.utils.iterators.RdfStream;
import org.fcrepo.kernel.api.rdf.DefaultRdfStream;
import org.fcrepo.kernel.modeshape.rdf.impl.DefaultIdentifierTranslator;
import org.fcrepo.kernel.modeshape.rdf.impl.PropertiesRdfContext;
import org.junit.Test;

import javax.jcr.PathNotFoundException;
import javax.jcr.Property;
import javax.jcr.RepositoryException;
import javax.jcr.Session;

import static com.hp.hpl.jena.graph.NodeFactory.createURI;
import static org.fcrepo.kernel.api.RdfLexicon.REPOSITORY_NAMESPACE;
import static org.fcrepo.kernel.api.RdfContext.PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -71,7 +72,8 @@ public void testWriteProperty() throws RepositoryException {
"'some-test-name' }";

// Write the properties
object.updateProperties(new DefaultIdentifierTranslator(session), sparql, new RdfStream());
object.updateProperties(new DefaultIdentifierTranslator(session), sparql,
new DefaultRdfStream(createURI("info:fedora" + testFilePath())));

// Verify
final Property property = object.getNode().getProperty("fedora:name");
Expand Down Expand Up @@ -99,7 +101,8 @@ public void testRemoveProperty() throws RepositoryException {

// Write the properties
final DefaultIdentifierTranslator graphSubjects = new DefaultIdentifierTranslator(session);
object.updateProperties(graphSubjects, sparql, new RdfStream());
object.updateProperties(graphSubjects, sparql, new DefaultRdfStream(
createURI("info:fedora" + testFilePath())));

// Verify property exists
final Property property = object.getNode().getProperty("fedora:remove");
Expand All @@ -116,7 +119,7 @@ public void testRemoveProperty() throws RepositoryException {
// Remove the properties
object.updateProperties(graphSubjects,
sparqlRemove,
object.getTriples(graphSubjects, PropertiesRdfContext.class));
object.getTriples(graphSubjects, PROPERTIES));

// Persist the object (although the propery will be removed from memory without this.)
session.save();
Expand Down
Expand Up @@ -16,11 +16,11 @@
package org.fcrepo.http.api;


import static com.google.common.collect.Iterators.concat;
import static com.google.common.collect.Iterators.filter;
import static com.google.common.collect.Iterators.transform;
import static com.hp.hpl.jena.rdf.model.ModelFactory.createDefaultModel;
import static com.hp.hpl.jena.vocabulary.RDF.type;
import static java.util.EnumSet.of;
import static java.util.stream.Stream.concat;
import static java.util.stream.Stream.empty;
import static javax.ws.rs.core.HttpHeaders.CACHE_CONTROL;
import static javax.ws.rs.core.MediaType.APPLICATION_OCTET_STREAM_TYPE;
import static javax.ws.rs.core.Response.ok;
Expand All @@ -40,15 +40,26 @@
import static org.fcrepo.kernel.api.RdfLexicon.INDIRECT_CONTAINER;
import static org.fcrepo.kernel.api.RdfLexicon.LDP_NAMESPACE;
import static org.fcrepo.kernel.api.RdfLexicon.isManagedNamespace;
import static org.fcrepo.kernel.api.RdfContext.EMBED_RESOURCES;
import static org.fcrepo.kernel.api.RdfContext.INBOUND_REFERENCES;
import static org.fcrepo.kernel.api.RdfContext.LDP_CONTAINMENT;
import static org.fcrepo.kernel.api.RdfContext.LDP_MEMBERSHIP;
import static org.fcrepo.kernel.api.RdfContext.MINIMAL;
import static org.fcrepo.kernel.api.RdfContext.PROPERTIES;
import static org.fcrepo.kernel.api.RdfContext.SERVER_MANAGED;
import static org.fcrepo.kernel.modeshape.rdf.ManagedRdf.isManagedTriple;
import static org.fcrepo.kernel.modeshape.utils.NamespaceTools.getNamespaces;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.EnumSet;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Stream;

import javax.inject.Inject;
import javax.jcr.AccessDeniedException;
Expand All @@ -74,6 +85,7 @@
import org.fcrepo.http.commons.domain.Range;
import org.fcrepo.http.commons.domain.ldp.LdpPreferTag;
import org.fcrepo.http.commons.responses.RangeRequestInputStream;
import org.fcrepo.http.commons.responses.RdfNamespacedStream;
import org.fcrepo.kernel.api.exception.InvalidChecksumException;
import org.fcrepo.kernel.api.exception.MalformedRdfException;
import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
Expand All @@ -82,31 +94,17 @@
import org.fcrepo.kernel.api.models.FedoraResource;
import org.fcrepo.kernel.api.models.NonRdfSource;
import org.fcrepo.kernel.api.models.NonRdfSourceDescription;
import org.fcrepo.kernel.api.RdfContext;
import org.fcrepo.kernel.api.rdf.DefaultRdfStream;
import org.fcrepo.kernel.api.RdfStream;
import org.fcrepo.kernel.api.services.policy.StoragePolicyDecisionPoint;
import org.fcrepo.kernel.api.utils.iterators.RdfStream;
import org.fcrepo.kernel.modeshape.rdf.impl.AclRdfContext;
import org.fcrepo.kernel.modeshape.rdf.impl.SkolemNodeRdfContext;
import org.fcrepo.kernel.modeshape.rdf.impl.ChildrenRdfContext;
import org.fcrepo.kernel.modeshape.rdf.impl.ContentRdfContext;
import org.fcrepo.kernel.modeshape.rdf.impl.HashRdfContext;
import org.fcrepo.kernel.modeshape.rdf.impl.LdpContainerRdfContext;
import org.fcrepo.kernel.modeshape.rdf.impl.LdpIsMemberOfRdfContext;
import org.fcrepo.kernel.modeshape.rdf.impl.LdpRdfContext;
import org.fcrepo.kernel.modeshape.rdf.impl.ParentRdfContext;
import org.fcrepo.kernel.modeshape.rdf.impl.PropertiesRdfContext;
import org.fcrepo.kernel.modeshape.rdf.impl.ReferencesRdfContext;
import org.fcrepo.kernel.modeshape.rdf.impl.RootRdfContext;
import org.fcrepo.kernel.modeshape.rdf.impl.TypeRdfContext;
import org.fcrepo.kernel.modeshape.services.TransactionServiceImpl;

import org.apache.jena.riot.Lang;
import org.glassfish.jersey.media.multipart.ContentDisposition;
import org.jvnet.hk2.annotations.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.Statement;

/**
* An abstract class that sits between AbstractResource and any resource that
Expand Down Expand Up @@ -158,11 +156,14 @@ protected Response getContent(final String rangeValue,
* @param limit is the number of child resources returned in the response, -1 for all
* @param rdfStream to which response RDF will be concatenated
* @return HTTP response
* @throws IOException
* @throws IOException in case of error extracting content
*/
protected Response getContent(final String rangeValue,
final int limit,
final RdfStream rdfStream) throws IOException {

final RdfNamespacedStream outputStream;

if (resource() instanceof FedoraBinary) {

final String contentTypeString = ((FedoraBinary) resource()).getMimeType();
Expand All @@ -178,7 +179,10 @@ protected Response getContent(final String rangeValue,
final Model inputModel = createDefaultModel()
.read(content, (resource()).toString(), format);

rdfStream.concat(Iterators.transform(inputModel.listStatements(), Statement::asTriple));
outputStream = new RdfNamespacedStream(
new DefaultRdfStream(rdfStream.topic(), concat(rdfStream,
DefaultRdfStream.fromModel(rdfStream.topic(), inputModel))),
getNamespaces(session()));
} else {

final MediaType mediaType = MediaType.valueOf(contentTypeString);
Expand All @@ -196,14 +200,17 @@ protected Response getContent(final String rangeValue,
}

} else {
rdfStream.concat(getResourceTriples(limit));
outputStream = new RdfNamespacedStream(
new DefaultRdfStream(rdfStream.topic(), concat(rdfStream,
getResourceTriples(limit))),
getNamespaces(session()));
if (prefer != null) {
prefer.getReturn().addResponseHeaders(servletResponse);
}
}
servletResponse.addHeader("Vary", "Accept, Range, Accept-Encoding, Accept-Language");

return ok(rdfStream).build();
return ok(outputStream).build();
}

protected RdfStream getResourceTriples() {
Expand Down Expand Up @@ -233,69 +240,60 @@ protected RdfStream getResourceTriples(final int limit) {

final LdpPreferTag ldpPreferences = new LdpPreferTag(returnPreference);

final RdfStream rdfStream = new RdfStream();

final Predicate<Triple> tripleFilter = ldpPreferences.prefersServerManaged() ? x -> true :
IS_MANAGED_TRIPLE.negate();

if (ldpPreferences.prefersServerManaged()) {
rdfStream.concat(getTriples(LdpRdfContext.class));
}
final List<Stream<Triple>> streams = new ArrayList<>();

rdfStream.concat(filter(getTriples(TypeRdfContext.class), tripleFilter::test));

rdfStream.concat(filter(getTriples(PropertiesRdfContext.class), tripleFilter::test));
if (returnPreference.getValue().equals("minimal")) {
streams.add(getTriples(of(PROPERTIES, MINIMAL)).filter(tripleFilter));

if (!returnPreference.getValue().equals("minimal")) {
if (ldpPreferences.prefersServerManaged()) {
streams.add(getTriples(of(SERVER_MANAGED, MINIMAL)));
}
} else {
streams.add(getTriples(PROPERTIES).filter(tripleFilter));

// Additional server-managed triples about this resource
if (ldpPreferences.prefersServerManaged()) {
rdfStream.concat(getTriples(AclRdfContext.class));
rdfStream.concat(getTriples(RootRdfContext.class));
rdfStream.concat(getTriples(ContentRdfContext.class));
rdfStream.concat(getTriples(ParentRdfContext.class));
streams.add(getTriples(SERVER_MANAGED));
}

// containment triples about this resource
if (ldpPreferences.prefersContainment()) {
rdfStream.concat(getTriples(ChildrenRdfContext.class).limit(limit));
if (limit == -1) {
streams.add(getTriples(LDP_CONTAINMENT));
} else {
streams.add(getTriples(LDP_CONTAINMENT).limit(limit));
}
}

// LDP container membership triples for this resource
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that you are changing the logic to include HashRdfContext and SkolemNodeRdfContext in both Prefer=minimal and !Prefer=minimal, whereas they previously were only included in !Prefer=minimal.

Note: Hash and Skolem are being added here: https://github.com/fcrepo4/fcrepo4/pull/962/files#diff-d372035f89c8d0846920e20bd13e0145R245

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true. Should Hash and Skolem contexts be excluded from representation=minimal?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the way we have been interpreting minimal to-date. So I would say, yes.

if (ldpPreferences.prefersMembership()) {
rdfStream.concat(getTriples(LdpContainerRdfContext.class));
rdfStream.concat(getTriples(LdpIsMemberOfRdfContext.class));
streams.add(getTriples(LDP_MEMBERSHIP));
}

// Embed all hash and blank nodes
// using IS_MANAGED_TRIPLE directly to avoid Prefer header logic (we never want them for hash fragments)
rdfStream.concat(filter(getTriples(HashRdfContext.class), IS_MANAGED_TRIPLE.negate()::test));
rdfStream.concat(filter(getTriples(SkolemNodeRdfContext.class), tripleFilter::test));

// Include inbound references to this object
if (ldpPreferences.prefersReferences()) {
rdfStream.concat(getTriples(ReferencesRdfContext.class));
streams.add(getTriples(INBOUND_REFERENCES));
}

// Embed the children of this object
if (ldpPreferences.prefersEmbed()) {

final Iterator<FedoraResource> children = resource().getChildren();

rdfStream.concat(filter(concat(transform(children, child ->
child.getTriples(translator(),
ImmutableList.of(
TypeRdfContext.class, PropertiesRdfContext.class, SkolemNodeRdfContext.class)))),
tripleFilter::test));

streams.add(getTriples(EMBED_RESOURCES));
}
}

final RdfStream rdfStream = new DefaultRdfStream(
translator().reverse().convert(resource()).asNode(),
streams.stream().reduce(empty(), Stream::concat));

if (httpTripleUtil != null && ldpPreferences.prefersServerManaged()) {
httpTripleUtil.addHttpComponentModelsForResourceToStream(rdfStream, resource(), uriInfo, translator());
return httpTripleUtil.addHttpComponentModelsForResourceToStream(rdfStream, resource(), uriInfo,
translator());
}


return rdfStream;
}

Expand Down Expand Up @@ -365,11 +363,19 @@ protected Response getBinaryContent(final String rangeValue)

}

protected RdfStream getTriples(final Class<? extends RdfStream> x) {
protected RdfStream getTriples(final EnumSet<RdfContext> x) {
return getTriples(resource(), x);
}

protected RdfStream getTriples(final FedoraResource resource, final EnumSet<RdfContext> x) {
return resource.getTriples(translator(), x);
}

protected RdfStream getTriples(final RdfContext x) {
return getTriples(resource(), x);
}

protected RdfStream getTriples(final FedoraResource resource, final Class<? extends RdfStream> x) {
protected RdfStream getTriples(final FedoraResource resource, final RdfContext x) {
return resource.getTriples(translator(), x);
}

Expand Down
Expand Up @@ -26,6 +26,7 @@
import static org.fcrepo.http.commons.domain.RDFMediaType.RDF_XML;
import static org.fcrepo.http.commons.domain.RDFMediaType.TURTLE;
import static org.fcrepo.http.commons.domain.RDFMediaType.TURTLE_X;
import static org.fcrepo.kernel.modeshape.utils.NamespaceTools.getNamespaces;
import static org.slf4j.LoggerFactory.getLogger;

import javax.inject.Inject;
Expand All @@ -38,8 +39,9 @@

import com.google.common.annotations.VisibleForTesting;
import org.fcrepo.http.commons.responses.HtmlTemplate;
import org.fcrepo.http.commons.responses.RdfNamespacedStream;
import org.fcrepo.kernel.api.models.FedoraBinary;
import org.fcrepo.kernel.api.utils.iterators.RdfStream;
import org.fcrepo.kernel.api.rdf.DefaultRdfStream;
import org.slf4j.Logger;
import org.springframework.context.annotation.Scope;

Expand Down Expand Up @@ -92,17 +94,17 @@ public FedoraFixity(final String externalPath) {
@Produces({TURTLE + ";qs=10", JSON_LD + ";qs=8",
N3, N3_ALT2, RDF_XML, NTRIPLES, APPLICATION_XML, TEXT_PLAIN, TURTLE_X,
TEXT_HTML, APPLICATION_XHTML_XML, "*/*"})
public RdfStream getDatastreamFixity() {
public RdfNamespacedStream getDatastreamFixity() {

if (!(resource() instanceof FedoraBinary)) {
throw new NotFoundException(resource() + " is not a binary");
}

LOGGER.info("Get fixity for '{}'", externalPath);
return ((FedoraBinary)resource()).getFixity(translator())
.topic(translator().reverse().convert(resource()).asNode())
.session(session);

return new RdfNamespacedStream(
new DefaultRdfStream(translator().reverse().convert(resource()).asNode(),
((FedoraBinary)resource()).getFixity(translator())),
getNamespaces(session()));
}

@Override
Expand Down