Skip to content

Commit

Permalink
add support for partial graph load and merge
Browse files Browse the repository at this point in the history
  • Loading branch information
liorperry committed Jul 28, 2019
1 parent de6a44d commit d1c7fbb
Show file tree
Hide file tree
Showing 19 changed files with 470 additions and 34 deletions.
29 changes: 27 additions & 2 deletions fuse-core/src/main/java/com/yangdb/fuse/client/BaseFuseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,26 @@
import com.yangdb.fuse.model.results.Entity;
import com.yangdb.fuse.model.results.QueryResultBase;
import com.yangdb.fuse.model.results.Relationship;
import com.yangdb.fuse.model.transport.*;
import com.yangdb.fuse.model.transport.CreateJsonQueryRequest;
import com.yangdb.fuse.model.transport.CreatePageRequest;
import com.yangdb.fuse.model.transport.CreateQueryRequest;
import com.yangdb.fuse.model.transport.PlanTraceOptions;
import com.yangdb.fuse.model.transport.cursor.CreateCursorRequest;
import com.yangdb.fuse.model.transport.cursor.CreatePathsCursorRequest;
import javaslang.Tuple2;
import javaslang.collection.Stream;
import org.jooby.MediaType;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Map;
import java.util.UUID;

import static io.restassured.RestAssured.given;
import static com.yangdb.fuse.client.FuseClient.*;
import static io.restassured.RestAssured.given;

/**
* Created by Roman on 11/05/2017.
Expand Down Expand Up @@ -97,6 +103,25 @@ public QueryResourceInfo loadData(String ontology, URL resource) throws IOExcept
return loadData(ontology,objectMapper.readValue(resource,LogicalGraphModel.class));
}

@Override
public QueryResourceInfo uploadFile(String ontology, URL resourceFile) throws URISyntaxException {
final File file = new File(resourceFile.getFile());
final URI uri = resourceFile.toURI();

String resourceURl = String.format("%s/load/ontology/%s/upload", this.fuseUrl, ontology);

String result = given()
.multiPart("file", file)
.formParam("file", uri.toASCIIString())
.contentType("multipart/form-data")
.post(resourceURl)
.thenReturn()
.print();

return new QueryResourceInfo(QueryMetadata.Type.concrete, resourceURl,ontology,result);

}

@Override
public QueryResourceInfo postQuery(String queryStoreUrl, Query query) throws IOException {
return postQuery(queryStoreUrl,query, PlanTraceOptions.of(PlanTraceOptions.Level.none));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.yangdb.fuse.model.transport.cursor.CreateCursorRequest;

import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.function.Predicate;

Expand Down Expand Up @@ -67,7 +68,7 @@ static long countGraphElements(QueryResultBase pageData, boolean relationship, b
}

static String postRequest(String url, Object body) throws IOException {
return given().contentType("application/json")
return given().contentType("application/json")
.body(body)
.post(url)
.thenReturn()
Expand Down Expand Up @@ -98,6 +99,8 @@ static <T> T unwrapDouble(String response) throws IOException {

QueryResourceInfo loadData(String ontology, URL resource) throws IOException;

QueryResourceInfo uploadFile(String ontology, URL resource) throws IOException, URISyntaxException;

QueryResourceInfo postQuery(String queryStoreUrl, Query query) throws IOException;

QueryResourceInfo postQuery(String queryStoreUrl, String query, String ontology) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import java.io.IOException;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.Arrays;
Expand Down Expand Up @@ -166,6 +167,11 @@ public QueryResourceInfo loadData(String ontology, URL resource) throws IOExcept
return selectNode().loadData(ontology,resource);
}

@Override
public QueryResourceInfo uploadFile(String ontology, URL resource) throws IOException, URISyntaxException {
return selectNode().uploadFile(ontology,resource);
}

@Override
public QueryResourceInfo postQuery(String queryStoreUrl, Query query) throws IOException {
return selectNode().postQuery(queryStoreUrl,query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ application.host = 0.0.0.0
application.port = 8888
application.profile = activeProfile

server.http.HeaderSize = 8k
# Max response buffer size
server.http.ResponseBufferSize = 10m
# Max request body size to keep in memory
server.http.RequestBufferSize = 10m
# Max request size total (body + header)
server.http.MaxRequestSize = 10m

modules.activeProfile = [
"com.yangdb.fuse.services.modules.ServiceModule",
"com.yangdb.fuse.services.modules.LoggingJacksonModule",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ application.host = 0.0.0.0
application.port = 8888
application.profile = activeProfile

server.http.HeaderSize = 8k
# Max response buffer size
server.http.ResponseBufferSize = 10m
# Max request body size to keep in memory
server.http.RequestBufferSize = 10m
# Max request size total (body + header)
server.http.MaxRequestSize = 10m

modules.activeProfile = [
"com.yangdb.fuse.services.modules.ServiceModule",
"com.yangdb.fuse.dispatcher.modules.NewDispatcherModule",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public KnowledgeDataLoader(Config config, Client client, RawSchema schema, Ontol
final Optional<OntologyTransformer> assembly = transformerProvider.transformer(config.getString("assembly"));
if(!assembly.isPresent())
throw new IllegalArgumentException("No transformer provider found for selected ontology "+config.getString("assembly"));
this.transformer = new KnowledgeTransformer(assembly.get(),schema,idGenerator );
this.transformer = new KnowledgeTransformer(assembly.get(),schema,idGenerator,client );
this.client = client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ public class KnowledgeContext {
private List<ValueBuilder> eValues;
private List<RelationBuilder> relations;
private List<RvalueBuilder> rValues;
private List<RelationBuilder.EntityRelationBuilder> relationBuilders;

public KnowledgeContext() {
entities = new ArrayList<>();
eValues = new ArrayList<>();
relations = new ArrayList<>();
rValues = new ArrayList<>();
relationBuilders = new ArrayList<>();
failed = new ArrayList<>();
}

Expand All @@ -54,6 +56,10 @@ public void add(EntityBuilder builder) {
entities.add(builder);
}

public void add(RelationBuilder.EntityRelationBuilder builder) {
relationBuilders.add(builder);
}

public void add(RelationBuilder builder) {
relations.add(builder);
}
Expand All @@ -70,6 +76,10 @@ public List<EntityBuilder> getEntities() {
return entities;
}

public List<RelationBuilder.EntityRelationBuilder> getRelationBuilders() {
return relationBuilders;
}

public List<ValueBuilder> geteValues() {
return eValues;
}
Expand All @@ -86,6 +96,10 @@ public Optional<EntityBuilder> findEntityById(String id) {
return entities.stream().filter(e->e.logicalId.equals(id)).findAny();
}

public Optional<EntityBuilder> findEntityByTechId(String id) {
return entities.stream().filter(e->e.techId.equals(id)).findAny();
}

public Optional<EntityBuilder> findEntityByProperty(String property,String value) {
return entities.stream()
.filter(e->e.additionalProperties.containsKey(property))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -33,6 +33,7 @@
import com.yangdb.fuse.model.ontology.transformer.TransformerEntityType;
import com.yangdb.fuse.model.ontology.transformer.TransformerProperties;
import com.yangdb.fuse.model.ontology.transformer.TransformerRelationType;
import org.elasticsearch.client.Client;
import org.geojson.Point;

import java.text.ParseException;
Expand All @@ -41,6 +42,7 @@
import java.util.Map;
import java.util.Optional;

import static com.yangdb.fuse.assembly.knowledge.KnowledgeRawSchema.ENTITY;
import static com.yangdb.fuse.assembly.knowledge.load.KnowledgeWriterContext.format;
import static com.yangdb.fuse.assembly.knowledge.load.builder.EntityBuilder.*;
import static com.yangdb.fuse.assembly.knowledge.load.builder.ValueBuilder.*;
Expand All @@ -55,13 +57,15 @@ public class KnowledgeTransformer {
private OntologyTransformer transformer;
private RawSchema schema;
private IdGeneratorDriver<Range> idGenerator;
private Client client;
private KnowledgeWriterContext writerContext;

@Inject
public KnowledgeTransformer(OntologyTransformer transformer, RawSchema schema, IdGeneratorDriver<Range> idGenerator) {
public KnowledgeTransformer(OntologyTransformer transformer, RawSchema schema, IdGeneratorDriver<Range> idGenerator, Client client) {
this.transformer = transformer;
this.schema = schema;
this.idGenerator = idGenerator;
this.client = client;
}

public KnowledgeContext transform(LogicalGraphModel graph) {
Expand All @@ -79,7 +83,7 @@ public KnowledgeContext transform(LogicalGraphModel graph) {
case EntityBuilder.type:
StatefulRange range = ranges.computeIfAbsent(EntityBuilder.type,
s -> new StatefulRange(idGenerator.getNext(EntityBuilder.type, 1000)));
context.add(createEntity(context,range, type, node));
context.add(createEntity(context, range, type, node));
break;
case RefBuilder.type:
break;
Expand All @@ -102,36 +106,71 @@ public KnowledgeContext transform(LogicalGraphModel graph) {
case RelationBuilder.type:
StatefulRange range = ranges.computeIfAbsent(RelationBuilder.type,
s -> new StatefulRange(idGenerator.getNext(RelationBuilder.type, 1000)));
context.add(createEdge(context,range, type, edge));
context.add(createEdge(context, range, type, edge));
break;
}
});
return context;
}

private RelationBuilder createEdge(KnowledgeContext context, StatefulRange range, TransformerRelationType type, LogicalEdge edge) {
RelationBuilder builder = _rel(writerContext.nextRelId(schema,range.next()));
RelationBuilder builder = _rel(writerContext.nextRelId(schema, range.next()));
String physicalLabelKey = type.getLabel();
String labelValue = edge.getLabel();
builder.putProperty(physicalLabelKey, labelValue);

//set sides ids
Optional<EntityBuilder> sideA = writerContext.getContext().findEntityByProperty(TECHNICAL_ID, edge.getSource());
if(!sideA.isPresent())
//todo search Elastic for the given node
throw new IllegalArgumentException(String.format("Source node %s for edge not found %s",edge.getSource(),edge.toString()));
builder.entityAId(sideA.get().id());
builder.entityACategory(sideA.get().category);
Optional<EntityBuilder> sideA = writerContext.getContext().findEntityById(edge.getSource()).isPresent()
? writerContext.getContext().findEntityById(edge.getSource())
: writerContext.getContext().findEntityByTechId(edge.getSource());

Optional<EntityBuilder> sideB = writerContext.getContext().findEntityByProperty(TECHNICAL_ID, edge.getTarget());
if(!sideB.isPresent())
if (!sideA.isPresent()) {
//todo search Elastic for the given node
throw new IllegalArgumentException(String.format("Source node %s for edge not found %s",edge.getTarget(),edge.toString()));
builder.entityBId(sideB.get().id());
builder.entityBCategory(sideB.get().category);
Optional<Map> node = StoreAccessor.findEntityById(TECHNICAL_ID, edge.getSource(), ENTITY, schema, client);

if (node.isPresent()) {
builder.entityAId(node.get().get("id").toString());
builder.entityATechId(node.get().get("techId").toString());
builder.entityACategory(node.get().get("category").toString());
} else
throw new IllegalArgumentException(String.format("Source node %s for edge not found %s", edge.getSource(), edge.toString()));
} else {
builder.entityAId(sideA.get().id());
builder.entityATechId(sideA.get().techId);
builder.entityACategory(sideA.get().category);
}


Optional<EntityBuilder> sideB = writerContext.getContext().findEntityById(edge.getTarget()).isPresent()
? writerContext.getContext().findEntityById(edge.getTarget())
: writerContext.getContext().findEntityByTechId(edge.getTarget());

if (!sideB.isPresent()) {
//search Elastic for the given node
Optional<Map> node = StoreAccessor.findEntityById(TECHNICAL_ID, edge.getTarget(), ENTITY, schema, client);

if (node.isPresent()) {
builder.entityBId(node.get().get("id").toString());
builder.entityBTechId(node.get().get("techId").toString());
builder.entityBCategory(node.get().get("category").toString());
} else
throw new IllegalArgumentException(String.format("Target node %s for edge not found %s", edge.getSource(), edge.toString()));
} else {
builder.entityBId(sideB.get().id());
builder.entityBTechId(sideB.get().techId);
builder.entityBCategory(sideB.get().category);
}

//populate side with relation builder hasEntityRelation
sideA.get().rel(builder,"out");
sideB.get().rel(builder,"in");
if(sideA.isPresent())
sideA.get().rel(builder, "out");
else
context.add(new RelationBuilder.EntityRelationBuilder(builder.entityAId, builder, "out"));

if(sideB.isPresent())
sideB.get().rel(builder, "in");
else
context.add(new RelationBuilder.EntityRelationBuilder(builder.entityBId, builder, "in"));

//set metadata properties
edge.getMetadata().getProperties().forEach((logicalKey, value) -> {
Expand Down Expand Up @@ -159,11 +198,11 @@ private RelationBuilder createEdge(KnowledgeContext context, StatefulRange range
return builder;
}

private EntityBuilder createEntity(KnowledgeContext context,StatefulRange range, TransformerEntityType type, LogicalNode node) {
//todo what if the "id" field is present -> should we use it instead of the auto id generator ??
private EntityBuilder createEntity(KnowledgeContext context, StatefulRange range, TransformerEntityType type, LogicalNode node) {
//if the "id" field is present -> use it in the techId section
EntityBuilder builder = _e(writerContext.nextLogicalId(schema, range.next()));
//technical Id to find the node by (real id is given by the engine sequencer)
builder.putProperty(TECHNICAL_ID,node.getId());
builder.putProperty(TECHNICAL_ID, node.getId());
String physicalLabelKey = type.getLabel();
String labelValue = node.getLabel();
builder.putProperty(physicalLabelKey, labelValue);
Expand Down Expand Up @@ -211,6 +250,7 @@ private ValueBuilder createValueBuilder(StatefulRange propRange, TransformerProp

return valueBuilder;
}

private RvalueBuilder createRValueBuilder(StatefulRange propRange, TransformerProperties properties, String key, Object value) {
RvalueBuilder valueBuilder = _r(writerContext.nextValueId(schema, propRange.next()));
//todo think if TransformerProperties.label pattern is still relevant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ public static <T extends KnowledgeDomainBuilder> int commit(Client client, Strin
public static int commit(Client client, RawSchema schema, ObjectMapper mapper, KnowledgeContext context) throws JsonProcessingException {
int count = 0;
count += commit(client,schema.getPartition(ENTITY).getPartitions().iterator().next().getIndices().iterator().next(),mapper,context.getEntities());
count += commit(client,schema.getPartition(ENTITY).getPartitions().iterator().next().getIndices().iterator().next(),mapper,context.getRelationBuilders());

count += commit(client,schema.getPartition(EVALUE).getPartitions().iterator().next().getIndices().iterator().next(),mapper,context.geteValues());
count += commit(client,schema.getPartition(RELATION).getPartitions().iterator().next().getIndices().iterator().next(),mapper,context.getRelations());
count += commit(client,schema.getPartition(RVALUE).getPartitions().iterator().next().getIndices().iterator().next(),mapper,context.getrValues());
Expand Down

0 comments on commit d1c7fbb

Please sign in to comment.