Skip to content

Commit

Permalink
gh-3059 Specify a merge graph.
Browse files Browse the repository at this point in the history
  • Loading branch information
GCHQDev404 committed Nov 9, 2023
1 parent 3cee0d5 commit 6a96fe5
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import uk.gov.gchq.gaffer.core.exception.GafferCheckedException;
import uk.gov.gchq.gaffer.federatedstore.FederatedStore;
import uk.gov.gchq.gaffer.federatedstore.FederatedStoreCache;
import uk.gov.gchq.gaffer.federatedstore.operation.FederatedOperation;
import uk.gov.gchq.gaffer.federatedstore.util.FederatedStoreUtil;
import uk.gov.gchq.gaffer.graph.Graph;
Expand All @@ -35,7 +34,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -123,9 +121,6 @@ private static BiFunction getMergeFunction(final FederatedOperation operation, f

// pass the given information from options to the operation context to be available to the merge function
if (operation.containsOption(GIVEN_MERGE_STORE)) {
final String suffixFederatedStoreCacheName = "mergeGraphCache" + new Random().nextInt();
new FederatedStoreCache(suffixFederatedStoreCacheName);
context.setVariable(GIVEN_MERGE_STORE+"cache",suffixFederatedStoreCacheName);
context.setVariable(GIVEN_MERGE_STORE, operation.getOption(GIVEN_MERGE_STORE));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import uk.gov.gchq.gaffer.core.exception.GafferRuntimeException;
import uk.gov.gchq.gaffer.data.element.Element;
import uk.gov.gchq.gaffer.data.elementdefinition.view.View;
import uk.gov.gchq.gaffer.exception.SerialisationException;
import uk.gov.gchq.gaffer.federatedstore.FederatedStore;
import uk.gov.gchq.gaffer.federatedstore.operation.FederatedOperation;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.jsonserialisation.JSONSerialiser;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationException;
Expand Down Expand Up @@ -324,9 +326,13 @@ public static HashMap<String, Object> processGivenResultStoreForSpecificMergeFun
if (specificMergeFunction.isRequired(TEMP_RESULTS_GRAPH)) {
final String variable = (String) operationContext.getVariable(GIVEN_MERGE_STORE);
if (variable != null) {
throw new UnsupportedOperationException("Implementation of adding a different type of temporary merge graph " +
"is not yet supported. Behaviour on how to delete the graph is not yet defined. Behaviour of what info " +
"to take from users or admins, is not yet defined.");
try {
final GraphSerialisable deserialise = JSONSerialiser.deserialise(variable, GraphSerialisable.class);
functionContext.put(TEMP_RESULTS_GRAPH, deserialise);
} catch (SerialisationException e) {
throw new GafferRuntimeException("Error trying to extract the given GraphSerialisable temporary merge graph. found:" + variable, e);
}

}
}
return functionContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import uk.gov.gchq.gaffer.operation.impl.add.AddElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.StoreProperties;
import uk.gov.gchq.gaffer.store.schema.Schema;
import uk.gov.gchq.gaffer.store.schema.SchemaElementDefinition;
import uk.gov.gchq.gaffer.user.User;
Expand Down Expand Up @@ -65,7 +66,6 @@
* GraphC has the same elementA with property value 2.
* Asking for a simple GetAllElements with a view filter of property less than 100. Will incorrectly return elementA with a value 3.
* Because outside the functions scope, GraphA filtered out 101.
*/
public class MergeElementFunction implements ContextSpecificMergeFunction<Object, Iterable<Object>, Iterable<Object>> {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeElementFunction.class);
Expand Down Expand Up @@ -111,7 +111,9 @@ public MergeElementFunction(final Map<String, Object> context) throws GafferChec

private static void updateViewWithValidationFromSchema(final Map<String, Object> context) {
//Only do this for MapStore, not required for other stores.
if (MapStore.class.getName().equals(getGraph(context).getStoreProperties().getStoreClass())) {
final StoreProperties storeProperties = getTempResultsGraphStoreProperties(context);

if (MapStore.class.getName().equals(storeProperties.getStoreClass())) {
//Update View with
final View view = (View) context.get(VIEW);
final Schema schema = (Schema) context.get(SCHEMA);
Expand All @@ -127,6 +129,14 @@ private static void updateViewWithValidationFromSchema(final Map<String, Object>
}
}

private static StoreProperties getTempResultsGraphStoreProperties(final Map<String, Object> context) {
final Object trg = context.get(TEMP_RESULTS_GRAPH);
final StoreProperties storeProperties = (trg instanceof GraphSerialisable)
? ((GraphSerialisable) trg).getStoreProperties()
: ((Graph) trg).getStoreProperties();
return storeProperties;
}

@Override
public MergeElementFunction createFunctionWithContext(final HashMap<String, Object> context) throws GafferCheckedException {
return new MergeElementFunction(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import uk.gov.gchq.gaffer.federatedstore.util.MergeElementFunctionWithGivenStore;
import uk.gov.gchq.gaffer.graph.GraphConfig;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.jsonserialisation.JSONSerialiser;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.impl.add.AddElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.serialisation.implementation.JavaSerialiser;
import uk.gov.gchq.gaffer.store.schema.Schema;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -141,13 +141,14 @@ public void shouldNotReturnAnyElementsAfterInValidationInTemporaryMap() throws E
@Test
public void shouldNotCurrentlySupportOtherTempResultsGraph() throws Exception {

final byte[] givenResultsGraph = new JavaSerialiser().serialise(new GraphSerialisable.Builder()
//given
//The given results graph to use by the function
final byte[] givenResultsGraph = JSONSerialiser.serialise(new GraphSerialisable.Builder()
.config(new GraphConfig("TheGivenResultsGraph"))
.schema(loadSchemaFromJson("/schema/basicEntityValidateLess100Schema.json"))
.properties(FederatedStoreTestUtil.loadAccumuloStoreProperties(FederatedStoreTestUtil.ACCUMULO_STORE_SINGLE_USE_PROPERTIES))
.build());

//given
addGraphToAccumuloStore(federatedStore, GRAPH_ID_A, true, loadSchemaFromJson("/schema/basicEntityValidateLess100Schema.json"));
addGraphToAccumuloStore(federatedStore, GRAPH_ID_B, true, loadSchemaFromJson("/schema/basicEntityValidateLess100Schema.json"));
addGraphToAccumuloStore(federatedStore, GRAPH_ID_C, true, loadSchemaFromJson("/schema/basicEntityValidateLess100Schema.json"));
Expand All @@ -158,13 +159,26 @@ public void shouldNotCurrentlySupportOtherTempResultsGraph() throws Exception {
addEntity(GRAPH_ID_B, entityOther);

//when
Assertions.assertThatException()
.isThrownBy(() -> federatedStore.execute(new FederatedOperation.Builder()
.op(new GetAllElements())
.option(FederatedStoreUtil.GIVEN_MERGE_STORE, givenResultsGraph.toString())
.mergeFunction(new MergeElementFunctionWithGivenStore())
.build(), contextTestUser()))
.withMessageContaining("Implementation of adding a different type of temporary merge graph is not yet supported");
final Iterable elementsWithPropertyLessThan100 = (Iterable) federatedStore.execute(new FederatedOperation.Builder()
.op(new GetAllElements())
.option(FederatedStoreUtil.GIVEN_MERGE_STORE, new String(givenResultsGraph))
.mergeFunction(new MergeElementFunctionWithGivenStore())
.build(), contextTestUser());

//then
assertThat(elementsWithPropertyLessThan100)
.isNotNull()
.withFailMessage("should not return entity \"basicVertex\" with un-aggregated property 1 or 99")
.doesNotContain(entity1, entity99)
.withFailMessage("should not return entity \"basicVertex\" with an aggregated property 100, which is less than view filter 100")
.doesNotContain(new Entity.Builder()
.group(GROUP_BASIC_ENTITY)
.vertex(BASIC_VERTEX)
.property(PROPERTY_1, 100)
.build())
.withFailMessage("should return entity \"basicVertexOther\" with property 99, which is less than view filter 100")
.containsExactly(entityOther)
.hasSize(1);
}

private void addEntity(final String graphIdA, final Entity entity) throws OperationException {
Expand Down

0 comments on commit 6a96fe5

Please sign in to comment.