Skip to content

Commit

Permalink
gh-3059 FederatedElementFunction merge graph is only supports MapStor…
Browse files Browse the repository at this point in the history
…e at the moment
  • Loading branch information
GCHQDev404 committed Nov 7, 2023
1 parent df0ec86 commit 2d5bbd4
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static com.google.common.collect.Iterables.isEmpty;
import static java.util.Objects.nonNull;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedStoreUtil.GIVEN_MERGE_STORE;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedStoreUtil.getStoreConfiguredMergeFunction;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedStoreUtil.processIfFunctionIsContextSpecific;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedStoreUtil.updateOperationForGraph;
Expand Down Expand Up @@ -117,6 +118,12 @@ private Object mergeResults(final Iterable resultsFromAllGraphs, final Federated

private static BiFunction getMergeFunction(final FederatedOperation operation, final FederatedStore store, final Context context, final boolean isResultsFromAllGraphsEmpty) throws GafferCheckedException {
final BiFunction mergeFunction;

// pass the given information from options to the operation context to be available to the merge function
if (operation.containsOption(GIVEN_MERGE_STORE)) {
context.setVariable(GIVEN_MERGE_STORE, operation.getOption(GIVEN_MERGE_STORE));
}

if (isResultsFromAllGraphsEmpty) {
//No Merge function required.
mergeFunction = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@
* Such as the re-application of View filter or Schema Validation after the local aggregation of results from multiple graphs.
* By default, a local in memory MapStore is used for local aggregation,
* but a Graph or {@link GraphSerialisable} of any kind could be supplied via the {@link #context} with the key {@link #TEMP_RESULTS_GRAPH}.
* <p>
* An issue not covered:
* GraphA has elementA with a property value 101.
* GraphB has the same elementA with property value 1.
* GraphC has the same elementA with property value 2.
* Asking for a simple GetAllElements with a view filter of property less than 100. Will incorrectly return elementA with a value 3.
* Because outside the functions scope, GraphA filtered out 101.
*/
public class FederatedElementFunction implements ContextSpecificMergeFunction<Object, Iterable<Object>, Iterable<Object>> {
private static final Logger LOGGER = LoggerFactory.getLogger(FederatedElementFunction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2022-2023 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.federatedstore.util;

import com.fasterxml.jackson.annotation.JsonIgnore;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;


public class FederatedElementFunctionWithGivenStore extends FederatedElementFunction {


public static final String TEAR_DOWN_TEMP_GRAPH = "tearDownTempGraph";

@Override
@JsonIgnore
public Set<String> getRequiredContextValues() {
final HashSet<String> set = new HashSet<>(super.getRequiredContextValues());
set.add(TEMP_RESULTS_GRAPH);
set.add(TEAR_DOWN_TEMP_GRAPH);
return Collections.unmodifiableSet(set);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedElementFunction.SCHEMA;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedElementFunction.TEMP_RESULTS_GRAPH;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedElementFunction.USER;
import static uk.gov.gchq.gaffer.federatedstore.util.FederatedElementFunction.VIEW;

Expand All @@ -76,6 +77,7 @@ public final class FederatedStoreUtil {

@Deprecated
public static final String DEPRECATED_GRAPHIDS_OPTION = "gaffer.federatedstore.operation.graphIds";
public static final String GIVEN_MERGE_STORE = "gaffer.federatedstore.merge.function.given.merge.store";

private FederatedStoreUtil() {
}
Expand Down Expand Up @@ -304,6 +306,7 @@ public static BiFunction processIfFunctionIsContextSpecific(final BiFunction mer
final ContextSpecificMergeFunction specificMergeFunction = (ContextSpecificMergeFunction) mergeFunction;
HashMap<String, Object> functionContext = new HashMap<>();

functionContext = processGivenResultStoreForSpecificMergeFunction(specificMergeFunction, functionContext, operationContext, federatedStore);
functionContext = processSchemaForSpecificMergeFunction(specificMergeFunction, functionContext, payload, graphIds, operationContext, federatedStore);
functionContext = processViewForSpecificMergeFunction(specificMergeFunction, functionContext, payload);
functionContext = processUserForSpecificMergeFunction(specificMergeFunction, functionContext, operationContext.getUser());
Expand All @@ -317,6 +320,18 @@ public static BiFunction processIfFunctionIsContextSpecific(final BiFunction mer
return rtn;
}

public static HashMap<String, Object> processGivenResultStoreForSpecificMergeFunction(final ContextSpecificMergeFunction specificMergeFunction, final HashMap<String, Object> functionContext, final Context operationContext, final FederatedStore federatedStore) {
if (specificMergeFunction.isRequired(TEMP_RESULTS_GRAPH)) {
final String variable = (String) operationContext.getVariable(GIVEN_MERGE_STORE);
if (variable != null) {
throw new UnsupportedOperationException("Implementation of adding a different type of temporary merge graph " +
"is not yet supported. Behaviour on how to delete the graph is not yet defined. Behaviour of what info " +
"to take from users or admins, is not yet defined.");
}
}
return functionContext;
}

private static HashMap<String, Object> processViewForSpecificMergeFunction(final ContextSpecificMergeFunction specificMergeFunction, final HashMap<String, Object> functionContext, final Operation payload) throws GafferCheckedException {
if (specificMergeFunction.isRequired(VIEW)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@
package uk.gov.gchq.gaffer.federatedstore;


import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import uk.gov.gchq.gaffer.data.element.Entity;
import uk.gov.gchq.gaffer.federatedstore.operation.FederatedOperation;
import uk.gov.gchq.gaffer.federatedstore.util.FederatedElementFunctionWithGivenStore;
import uk.gov.gchq.gaffer.federatedstore.util.FederatedStoreUtil;
import uk.gov.gchq.gaffer.graph.GraphConfig;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.impl.add.AddElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.serialisation.implementation.JavaSerialiser;
import uk.gov.gchq.gaffer.store.schema.Schema;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -132,6 +138,34 @@ public void shouldNotReturnAnyElementsAfterInValidationInTemporaryMap() throws E
.hasSize(1);
}

@Test
public void shouldNotCurrentlySupportOtherTempResultsGraph() throws Exception {

final byte[] givenResultsGraph = new JavaSerialiser().serialise(new GraphSerialisable.Builder()
.config(new GraphConfig("TheGivenResultsGraph"))
.schema(loadSchemaFromJson("/schema/basicEntityValidateLess100Schema.json"))
.properties(FederatedStoreTestUtil.loadAccumuloStoreProperties(FederatedStoreTestUtil.ACCUMULO_STORE_SINGLE_USE_PROPERTIES))
.build());

//given
addGraphToAccumuloStore(federatedStore, GRAPH_ID_A, true, loadSchemaFromJson("/schema/basicEntityValidateLess100Schema.json"));
addGraphToAccumuloStore(federatedStore, GRAPH_ID_B, true, loadSchemaFromJson("/schema/basicEntityValidateLess100Schema.json"));
addGraphToAccumuloStore(federatedStore, GRAPH_ID_C, true, loadSchemaFromJson("/schema/basicEntityValidateLess100Schema.json"));

addEntity(GRAPH_ID_A, entity99); // 99 is valid
addEntity(GRAPH_ID_B, entity1); // 100 is not valid.
addEntity(GRAPH_ID_C, entity1); // correct behavior 100 & 1 is invalid. returning 1 would be incorrect if 100 had been deleted.
addEntity(GRAPH_ID_B, entityOther);

//when
Assertions.assertThatException()
.isThrownBy(() -> federatedStore.execute(new FederatedOperation.Builder()
.op(new GetAllElements())
.option(FederatedStoreUtil.GIVEN_MERGE_STORE, givenResultsGraph.toString())
.mergeFunction(new FederatedElementFunctionWithGivenStore())
.build(), contextTestUser()))
.withMessageContaining("Implementation of adding a different type of temporary merge graph is not yet implemented");
}

private void addEntity(final String graphIdA, final Entity entity) throws OperationException {
federatedStore.execute(new FederatedOperation.Builder()
Expand Down

0 comments on commit 2d5bbd4

Please sign in to comment.