Skip to content

Commit

Permalink
Merge branch 'develop' into gh-2495-FederatedGraphStorage-Accumulo-sp…
Browse files Browse the repository at this point in the history
…ecific-code
  • Loading branch information
GCHQDeveloper314 committed Apr 11, 2024
2 parents 0019841 + fe1d93a commit fc0daec
Showing 1 changed file with 87 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import uk.gov.gchq.gaffer.operation.data.EntitySeed;
import uk.gov.gchq.gaffer.operation.graph.SeededGraphFilters.IncludeIncomingOutgoingType;
import uk.gov.gchq.gaffer.operation.impl.add.AddElements;
import uk.gov.gchq.gaffer.operation.impl.generate.GenerateElements;
import uk.gov.gchq.gaffer.operation.impl.generate.GenerateObjects;
import uk.gov.gchq.gaffer.operation.impl.get.GetAdjacentIds;
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetElements;
Expand All @@ -53,12 +51,9 @@
import uk.gov.gchq.gaffer.store.schema.Schema;
import uk.gov.gchq.gaffer.tinkerpop.generator.GafferEdgeGenerator;
import uk.gov.gchq.gaffer.tinkerpop.generator.GafferEntityGenerator;
import uk.gov.gchq.gaffer.tinkerpop.generator.GafferPopEdgeGenerator;
import uk.gov.gchq.gaffer.tinkerpop.generator.GafferPopElementGenerator;
import uk.gov.gchq.gaffer.tinkerpop.generator.GafferPopVertexGenerator;
import uk.gov.gchq.gaffer.tinkerpop.service.GafferPopNamedOperationServiceFactory;
import uk.gov.gchq.gaffer.user.User;
import uk.gov.gchq.koryphe.iterable.ChainedIterable;
import uk.gov.gchq.koryphe.iterable.MappedIterable;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -322,12 +317,11 @@ public Vertex addVertex(final Object... keyValues) {
}

public void addVertex(final GafferPopVertex vertex) {
// Create the entity and add to graph
execute(new OperationChain.Builder()
.first(new GenerateElements.Builder<GafferPopVertex>()
.input(vertex)
.generator(new GafferEntityGenerator())
.build())
.then(new AddElements())
.first(new AddElements.Builder()
.input(new GafferEntityGenerator()._apply(vertex))
.build())
.build());

// Set read only if not told otherwise
Expand All @@ -337,13 +331,12 @@ public void addVertex(final GafferPopVertex vertex) {
}

public void addEdge(final GafferPopEdge edge) {
// Create the edge and add to graph
execute(new OperationChain.Builder()
.first(new GenerateElements.Builder<GafferPopEdge>()
.input(edge)
.generator(new GafferEdgeGenerator())
.build())
.then(new AddElements())
.build());
.first(new AddElements.Builder()
.input(new GafferEdgeGenerator()._apply(edge))
.build())
.build());
}

/**
Expand Down Expand Up @@ -377,16 +370,21 @@ public Iterator<Vertex> vertices(final Object... vertexIds) {
.build())
.build();
}

final Iterable<? extends GafferPopElement> result = execute(new Builder()
// Run requested chain on the graph
final Iterable<? extends Element> result = execute(new Builder()
.first(getOperation)
.then(new GenerateObjects.Builder<GafferPopElement>()
.generator(new GafferPopElementGenerator(this))
.build())
.build());

return (Iterator) result.iterator();
}
// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Iterable<Vertex> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Vertex.class::isInstance)
.map(e -> (Vertex) e)
.iterator();

return translatedResults.iterator();
}

/**
* This performs getRelatedEntities operation on Gaffer.
Expand Down Expand Up @@ -524,13 +522,20 @@ public Iterator<Edge> edges(final Object... elementIds) {
.build();
}

return (Iterator) execute(new Builder()
// Run requested chain on the graph
final Iterable<? extends Element> result = execute(new Builder()
.first(getOperation)
.then(new GenerateObjects.Builder<GafferPopEdge>()
.generator(new GafferPopEdgeGenerator(this))
.build())
.build())
.build());

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Iterable<Edge> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Edge.class::isInstance)
.map(e -> (Edge) e)
.iterator();

return translatedResults.iterator();
}

/**
Expand Down Expand Up @@ -633,21 +638,20 @@ public <T> T execute(final OperationChain<T> opChain) {
for (final Operation operation : opChain.getOperations()) {
operation.setOptions(opOptions);

if (operation instanceof Input) {
if (LOGGER.isDebugEnabled() && operation instanceof Input) {
Object input = ((Input) operation).getInput();
if (input != null) {
if (input instanceof MappedIterable) {
((MappedIterable) input).forEach(item -> { LOGGER.info("GafferPop operation input: " + item.toString()); });
} else {
LOGGER.info("GafferPop operation input: " + input.toString());
}
if (input instanceof MappedIterable) {
((MappedIterable) input).forEach(item -> {
LOGGER.debug("GafferPop operation input: {}", item);
});
} else {
LOGGER.debug("GafferPop operation input: {}", input);
}
}

}

try {
LOGGER.info("GafferPop operation chain called: " + opChain.toString());
LOGGER.info("GafferPop operation chain called: {}", opChain.toOverviewString());
return graph.execute(opChain, user);
} catch (final Exception e) {
LOGGER.error("Operation chain failed: " + e.getMessage(), e);
Expand Down Expand Up @@ -691,18 +695,21 @@ private Iterator<GafferPopVertex> verticesWithSeedsAndView(final List<ElementSee
}
}

final Iterable<? extends GafferPopVertex> result = execute(new OperationChain.Builder()
// Run operation on graph
final Iterable<? extends Element> result = execute(new OperationChain.Builder()
.first(getOperation)
.then(new GenerateObjects.Builder<GafferPopVertex>()
.generator(new GafferPopVertexGenerator(this))
.build())
.build());

if (!idVertices.isEmpty()) {
return new ChainedIterable<GafferPopVertex>(result, idVertices).iterator();
} else {
return (Iterator<GafferPopVertex>) result.iterator();
}
// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Iterable<GafferPopVertex> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(GafferPopVertex.class::isInstance)
.map(e -> (GafferPopVertex) e)
.iterator();

return translatedResults.iterator();

}

private Iterator<Vertex> adjVerticesWithSeedsAndView(final List<ElementSeed> seeds, final Direction direction, final View view) {
Expand All @@ -711,26 +718,25 @@ private Iterator<Vertex> adjVerticesWithSeedsAndView(final List<ElementSeed> see
}

View processedView = view == null ? createAllEntitiesView() : view;

final Iterable<? extends GafferPopElement> result = execute(new OperationChain.Builder()
final Iterable<? extends Element> result = execute(new OperationChain.Builder()
.first(new GetAdjacentIds.Builder()
.input(seeds)
.view(processedView)
.inOutType(getInOutType(direction))
.build())
.then(new GetElements.Builder()
.view(processedView)
.build())
.then(new GenerateObjects.Builder<GafferPopElement>()
.generator(new GafferPopElementGenerator(this))
.build())
// GetAdjacentIds provides list of entity seeds so run a GetElements to get the actual Entities
.then(new GetElements())
.build());

final Iterable<Vertex> resultVertexes = () -> StreamSupport.stream(result.spliterator(), false)
.filter(Vertex.class::isInstance)
.map(e -> (Vertex) e)
.iterator();
return resultVertexes.iterator();
// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this);
final Iterable<Vertex> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Vertex.class::isInstance)
.map(e -> (Vertex) e)
.iterator();

return translatedResults.iterator();
}

private Iterator<Edge> edgesWithSeedsAndView(final List<ElementSeed> seeds, final Direction direction, final View view) {
Expand Down Expand Up @@ -761,12 +767,20 @@ private Iterator<Edge> edgesWithSeedsAndView(final List<ElementSeed> seeds, fina
.build();
}

return (Iterator) execute(new OperationChain.Builder()
// Run requested chain on the graph
final Iterable<? extends Element> result = execute(new Builder()
.first(getOperation)
.then(new GenerateObjects.Builder<GafferPopEdge>()
.generator(new GafferPopEdgeGenerator(this, true))
.build())
.build()).iterator();
.build());

// Translate results to Gafferpop elements
final GafferPopElementGenerator generator = new GafferPopElementGenerator(this, true);
final Iterable<Edge> translatedResults = () -> StreamSupport.stream(result.spliterator(), false)
.map(generator::_apply)
.filter(Edge.class::isInstance)
.map(e -> (Edge) e)
.iterator();

return translatedResults.iterator();
}

private View createViewWithEntities(final String... labels) {
Expand All @@ -790,7 +804,7 @@ private View createView(final String... labels) {
View view = null;
if (null != labels && 0 < labels.length) {
final View.Builder viewBuilder = new View.Builder();
final Schema schema = ((Schema) variables().get(GafferPopGraphVariables.SCHEMA).get());
final Schema schema = graph.getSchema();
for (final String label : labels) {
if (schema.isEntity(label)) {
viewBuilder.entity(label);
Expand All @@ -807,8 +821,7 @@ private View createView(final String... labels) {

private View createAllEntitiesView() {
final View.Builder viewBuilder = new View.Builder();
final Schema schema = ((Schema) variables().get(GafferPopGraphVariables.SCHEMA).get());
for (final String group : schema.getEntityGroups()) {
for (final String group : graph.getSchema().getEntityGroups()) {
viewBuilder.entity(group);
}
return viewBuilder.build();
Expand Down Expand Up @@ -837,11 +850,15 @@ private List<ElementSeed> getElementSeeds(final Iterable<Object> ids) {
} else if (id instanceof Edge) {
seeds.add(new EdgeSeed(((Edge) id).outVertex().id(), ((Edge) id).inVertex().id(), true));
// Extract source and destination from ID list
} else if (id instanceof List) {
edgeIdList = (List<Object>) id;
} else if (id instanceof Iterable) {
((Iterable<?>) id).forEach(edgeIdList::add);
// Attempt to extract source and destination IDs from a string form of an array/list
} else if ((id instanceof String) && (((String) id).matches("^\\[.*,.*\\]$"))) {
edgeIdList = Arrays.asList(((String) id).replace("[", "").replace("]", "").split(","));
edgeIdList = Arrays.asList(((String) id)
.replaceAll("\\s", "")
.replace("[", "")
.replace("]", "")
.split(","));
// Assume entity ID as fallback
} else {
seeds.add(new EntitySeed(id));
Expand Down

0 comments on commit fc0daec

Please sign in to comment.