Skip to content

Commit

Permalink
Merge branch 'develop' into gh-2425-remove-duplicate-upgradeOperation…
Browse files Browse the repository at this point in the history
…Input
  • Loading branch information
GCHQDev404 committed Jun 28, 2021
2 parents a7f9e1a + ca29a8a commit 5af336f
Show file tree
Hide file tree
Showing 20 changed files with 913 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,12 @@ private void validateSharedGroupsAreCompatible(final Map<String, ? extends Schem
continue;
}

// Check to see if either of the properties are a subset of another properties
if (elementDef2.properties.entrySet().containsAll(elementDef1.properties.entrySet()) ||
elementDef1.properties.entrySet().containsAll(elementDef2.properties.entrySet())) {
continue;
}

throw new SchemaException("Element group properties cannot be defined in different schema parts, they must all be defined in a single schema part. "
+ "Please fix this group: " + sharedGroup);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.swagger.annotations.ApiOperation;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;

import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.rest.model.OperationDetail;
Expand Down Expand Up @@ -100,4 +101,13 @@ public interface IOperationController {
)
@ApiOperation("Executes an operation against a Store")
ResponseEntity<Object> execute(final Operation operation);

@RequestMapping(
method = POST,
path = "/execute/chunked",
consumes = APPLICATION_JSON_VALUE,
produces = { TEXT_PLAIN_VALUE, APPLICATION_JSON_VALUE }
)
@ApiOperation("Executes an operation against a Store, returning a chunked output")
ResponseEntity<StreamingResponseBody> executeChunked(final Operation operation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

package uk.gov.gchq.gaffer.rest.controller;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.annotations.ApiParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;

import uk.gov.gchq.gaffer.commonutil.CloseableUtil;
import uk.gov.gchq.gaffer.commonutil.pair.Pair;
import uk.gov.gchq.gaffer.core.exception.GafferRuntimeException;
import uk.gov.gchq.gaffer.core.exception.Status;
Expand All @@ -33,8 +37,11 @@
import uk.gov.gchq.gaffer.rest.model.OperationDetail;
import uk.gov.gchq.gaffer.rest.service.v2.AbstractOperationService;

import java.io.IOException;
import java.util.Set;


import static uk.gov.gchq.gaffer.jsonserialisation.JSONSerialiser.createDefaultMapper;
import static uk.gov.gchq.gaffer.rest.ServiceConstants.GAFFER_MEDIA_TYPE;
import static uk.gov.gchq.gaffer.rest.ServiceConstants.GAFFER_MEDIA_TYPE_HEADER;
import static uk.gov.gchq.gaffer.rest.ServiceConstants.JOB_ID_HEADER;
Expand All @@ -46,6 +53,8 @@ public class OperationController extends AbstractOperationService implements IOp
private final UserFactory userFactory;
private final ExamplesFactory examplesFactory;

public final ObjectMapper mapper = createDefaultMapper();

@Autowired
public OperationController(final GraphFactory graphFactory, final UserFactory userFactory, final ExamplesFactory examplesFactory) {
this.graphFactory = graphFactory;
Expand Down Expand Up @@ -115,11 +124,51 @@ public Operation getOperationExample(@PathVariable("className") @ApiParam(name =

@Override
public ResponseEntity<Object> execute(@RequestBody final Operation operation) {
Pair<Object, String> resultAndGraphId = _execute(operation, userFactory.createContext());
Pair<Object, String> resultAndJobId = _execute(operation, userFactory.createContext());
return ResponseEntity.ok()
.header(GAFFER_MEDIA_TYPE_HEADER, GAFFER_MEDIA_TYPE)
.header(JOB_ID_HEADER, resultAndJobId.getSecond())
.body(resultAndJobId.getFirst());
}

@Override
public ResponseEntity<StreamingResponseBody> executeChunked(@RequestBody final Operation operation) {
StreamingResponseBody responseBody = response -> {
try {
Pair<Object, String> resultAndJobId = _execute(operation, userFactory.createContext());
Object result = resultAndJobId.getFirst();
if (result instanceof Iterable) {
final Iterable itr = (Iterable) result;
try {
for (final Object item : itr) {
String itemString = mapper.writeValueAsString(item) + "\r\n";
response.write(itemString.getBytes());
response.flush();
}
} catch (final IOException ioe) {
throw new GafferRuntimeException("Unable to serialise chunk: ", ioe, Status.INTERNAL_SERVER_ERROR);
} finally {
CloseableUtil.close(itr);
}
} else {
try {
response.write(mapper.writeValueAsString(result).getBytes());
response.flush();
} catch (final IOException ioe) {
throw new GafferRuntimeException("Unable to serialise chunk: ", ioe, Status.INTERNAL_SERVER_ERROR);
}
}
} catch (final Exception e) {
throw new GafferRuntimeException("Unable to create chunk: ", e, Status.INTERNAL_SERVER_ERROR);
} finally {
CloseableUtil.close(operation);
}
};

return ResponseEntity.ok()
.header(GAFFER_MEDIA_TYPE_HEADER, GAFFER_MEDIA_TYPE)
.header(JOB_ID_HEADER, resultAndGraphId.getSecond())
.body(resultAndGraphId.getFirst());
.contentType(MediaType.APPLICATION_JSON)
.body(responseBody);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,34 @@
import org.apache.commons.lang3.exception.CloneFailedException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.http.ResponseEntity;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;

import uk.gov.gchq.gaffer.core.exception.GafferRuntimeException;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.graph.GraphConfig;
import uk.gov.gchq.gaffer.jsonserialisation.JSONSerialiser;
import uk.gov.gchq.gaffer.operation.Operation;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.impl.DiscardOutput;
import uk.gov.gchq.gaffer.operation.impl.add.AddElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetElements;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.rest.factory.ExamplesFactory;
import uk.gov.gchq.gaffer.rest.factory.GraphFactory;
import uk.gov.gchq.gaffer.rest.factory.UserFactory;
import uk.gov.gchq.gaffer.rest.model.OperationDetail;
import uk.gov.gchq.gaffer.rest.model.OperationField;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.StoreProperties;
import uk.gov.gchq.gaffer.store.schema.Schema;
import uk.gov.gchq.gaffer.user.User;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -48,6 +58,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static uk.gov.gchq.gaffer.core.exception.Status.BAD_REQUEST;
Expand Down Expand Up @@ -248,6 +259,20 @@ public void shouldReturnOptionsAndSummariesForEnumFields() throws Exception {

assertEquals(fields, operationFields);
}
@Test
public void shouldCorrectlyChunkIterables() throws IOException, OperationException {
// Given
when(userFactory.createContext()).thenReturn(new Context(new User()));
when(store.execute(any(Output.class), any(Context.class))).thenReturn(Arrays.asList(1, 2, 3));

// When
ResponseEntity<StreamingResponseBody> response = operationController.executeChunked(new GetAllElements());
OutputStream output = new ByteArrayOutputStream();
response.getBody().writeTo(output);

// Then
assertEquals("1\r\n2\r\n3\r\n", output.toString());
}

private static class UninstantiatableOperation implements Operation {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,32 @@

package uk.gov.gchq.gaffer.rest.integration.controller;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.util.LinkedMultiValueMap;

import uk.gov.gchq.gaffer.cache.impl.HashMapCacheService;
import uk.gov.gchq.gaffer.commonutil.StreamUtil;
import uk.gov.gchq.gaffer.core.exception.Error;
import uk.gov.gchq.gaffer.data.element.Entity;
import uk.gov.gchq.gaffer.federatedstore.operation.GetAllGraphIds;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.graph.GraphConfig;
import uk.gov.gchq.gaffer.mapstore.MapStoreProperties;
import uk.gov.gchq.gaffer.operation.impl.add.AddElements;
import uk.gov.gchq.gaffer.operation.impl.get.GetAllElements;
import uk.gov.gchq.gaffer.operation.impl.job.GetAllJobDetails;
import uk.gov.gchq.gaffer.rest.factory.GraphFactory;
import uk.gov.gchq.gaffer.rest.factory.MockGraphFactory;
import uk.gov.gchq.gaffer.store.Context;
import uk.gov.gchq.gaffer.store.StoreProperties;
import uk.gov.gchq.gaffer.store.schema.Schema;
import uk.gov.gchq.gaffer.store.schema.SchemaEntityDefinition;
import uk.gov.gchq.gaffer.store.schema.TypeDefinition;
import uk.gov.gchq.koryphe.impl.binaryoperator.StringConcat;

import java.io.IOException;
import java.util.Set;
Expand All @@ -42,6 +52,7 @@
import static org.mockito.Mockito.when;
import static uk.gov.gchq.gaffer.cache.util.CacheProperties.CACHE_SERVICE_CLASS;
import static uk.gov.gchq.gaffer.core.exception.Status.SERVICE_UNAVAILABLE;
import static uk.gov.gchq.gaffer.jsonserialisation.JSONSerialiser.createDefaultMapper;

public class OperationControllerIT extends AbstractRestApiIT {

Expand All @@ -55,7 +66,29 @@ private MockGraphFactory getGraphFactory() {

@Test
public void shouldReturnHelpfulErrorMessageIfJsonIsIncorrect() {
// Given
Graph graph = new Graph.Builder()
.config(StreamUtil.graphConfig(this.getClass()))
.storeProperties(new MapStoreProperties())
.addSchema(new Schema())
.build();

when(getGraphFactory().getGraph()).thenReturn(graph);

// When
String request = "{\"class\"\"GetAllElements\"}";

LinkedMultiValueMap headers = new LinkedMultiValueMap();
headers.add("Content-Type", "application/json;charset=utf-8");

final ResponseEntity<Error> response = post("/graph/operations/execute",
new HttpEntity(request, headers),
Error.class);

// Then
assertEquals(400, response.getStatusCode().value());
assertEquals(400, response.getBody().getStatusCode());
assertTrue(response.getBody().getSimpleMessage().contains("was expecting a colon to separate field name and value"));
}

@Test
Expand Down Expand Up @@ -154,4 +187,55 @@ public void shouldReturnSameJobIdInHeaderAsGetAllJobDetailsOperation() throws IO
}
}

@Test
public void shouldCorrectlyStreamExecuteChunked() throws Exception {
// Given
final Schema schema = new Schema.Builder()
.entity("g1", new SchemaEntityDefinition.Builder()
.vertex("string")
.build())
.type("string", new TypeDefinition.Builder()
.clazz(String.class)
.aggregateFunction(new StringConcat())
.build())
.build();

Graph graph = new Graph.Builder()
.config(new GraphConfig("id"))
.storeProperties(new MapStoreProperties())
.addSchema(schema)
.build();

when(getGraphFactory().getGraph()).thenReturn(graph);

Entity ent1 = new Entity.Builder()
.group("g1")
.vertex("v1")
.build();

Entity ent2 = new Entity.Builder()
.group("g1")
.vertex("v2")
.build();

final ObjectMapper mapper = createDefaultMapper();

graph.execute(new AddElements.Builder()
.input(ent1)
.build(), new Context());

graph.execute(new AddElements.Builder()
.input(ent2)
.build(), new Context());

// When
final ResponseEntity<String> response = post("/graph/operations/execute/chunked",
new GetAllElements.Builder()
.build(),
String.class);

// Then
String expected = mapper.writeValueAsString(ent1) + "\r\n" + mapper.writeValueAsString(ent2) + "\r\n";
assertEquals(expected, response.getBody());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import static java.util.Objects.nonNull;
import static uk.gov.gchq.gaffer.federatedstore.FederatedStoreConstants.KEY_OPERATION_OPTIONS_GRAPH_IDS;
import static uk.gov.gchq.gaffer.federatedstore.FederatedStoreConstants.KEY_SKIP_FAILED_FEDERATED_STORE_EXECUTE;

Expand Down Expand Up @@ -103,9 +105,11 @@ public static List<String> getCleanStrings(final String value) {
* @return cloned operation with modified View for the given graph.
*/
public static <OP extends Operation> OP updateOperationForGraph(final OP operation, final Graph graph) {
OP resultOp = operation;
if (operation instanceof Operations) {
resultOp = (OP) operation.shallowClone();
OP resultOp = (OP) operation.shallowClone();
if (nonNull(resultOp.getOptions())) {
resultOp.setOptions(new HashMap<>(resultOp.getOptions()));
}
if (resultOp instanceof Operations) {
final Operations<Operation> operations = (Operations) resultOp;
final List<Operation> resultOperations = new ArrayList<>();
for (final Operation nestedOp : operations.getOperations()) {
Expand All @@ -117,14 +121,13 @@ public static <OP extends Operation> OP updateOperationForGraph(final OP operati
resultOperations.add(updatedNestedOp);
}
operations.updateOperations(resultOperations);
} else if (operation instanceof OperationView) {
final View view = ((OperationView) operation).getView();
} else if (resultOp instanceof OperationView) {
final View view = ((OperationView) resultOp).getView();
if (null != view && view.hasGroups()) {
final View validView = createValidView(view, graph.getSchema());
if (view != validView) {
// If the view is not the same instance as the original view
// then clone the operation and add the new view.
resultOp = (OP) operation.shallowClone();
if (validView.hasGroups()) {
((OperationView) resultOp).setView(validView);
} else if (!graph.hasTrait(StoreTrait.DYNAMIC_SCHEMA)) {
Expand All @@ -134,8 +137,8 @@ public static <OP extends Operation> OP updateOperationForGraph(final OP operati
}
}
}
} else if (operation instanceof AddElements) {
final AddElements addElements = ((AddElements) operation);
} else if (resultOp instanceof AddElements) {
final AddElements addElements = ((AddElements) resultOp);
if (null == addElements.getInput()) {
if (!addElements.isValidate() || !addElements.isSkipInvalidElements()) {
LOGGER.debug("Invalid elements will be skipped when added to {}", graph.getGraphId());
Expand Down
Loading

0 comments on commit 5af336f

Please sign in to comment.