Skip to content

Commit

Permalink
implement GraphStoreService api for maxgraph && add snapshot (#537)
Browse files Browse the repository at this point in the history
* implement GraphStoreService api for maxgraph && add snapshot function
* guarantee one query use the same snapshot id
* refine cached graph schema structure
  • Loading branch information
shirly121 committed Jul 15, 2021
1 parent e28c441 commit ca65eda
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 19 deletions.
70 changes: 70 additions & 0 deletions interactive_engine/src/gaia-adaptor/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>maxgraph-src</artifactId>
<groupId>com.alibaba.maxgraph</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>gaia-adaptor</artifactId>

<dependencies>
<dependency>
<groupId>com.alibaba.graphscope</groupId>
<artifactId>gremlin-server-plugin</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.maxgraph</groupId>
<artifactId>v2</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>shaded.com.google.common</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>shaded.com.google.protobuf</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.alibaba.graphscope.gaia;

import com.alibaba.maxgraph.compiler.api.schema.GraphSchema;
import com.alibaba.maxgraph.compiler.api.schema.SchemaFetcher;
import org.apache.commons.lang3.tuple.Pair;

public class CachedMaxGraphMeta {
private SchemaFetcher schemaFetcher;
private GraphSchema graphSchema;
private Long snapshotId;

public CachedMaxGraphMeta(SchemaFetcher schemaFetcher) {
this.schemaFetcher = schemaFetcher;
}

public synchronized void update() {
Pair<GraphSchema, Long> pair = this.schemaFetcher.getSchemaSnapshotPair();
if (pair != null) {
this.graphSchema = pair.getLeft();
this.snapshotId = pair.getRight();
}
}

public GraphSchema getGraphSchema() {
return graphSchema;
}

public Long getSnapshotId() {
return snapshotId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.alibaba.graphscope.gaia;

import com.alibaba.graphscope.gaia.store.GraphStoreService;
import com.alibaba.graphscope.gaia.store.SchemaNotFoundException;
import com.alibaba.maxgraph.compiler.api.schema.GraphSchema;
import com.alibaba.maxgraph.compiler.api.schema.SchemaFetcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MaxGraphStore extends GraphStoreService {
private static final Logger logger = LoggerFactory.getLogger(MaxGraphStore.class);
public static final String MAXGRAPH_MODERN_PROPERTY_RESOURCE = "maxgraph.modern.properties.json";
private CachedMaxGraphMeta cachedGraphSchemaPair;

public MaxGraphStore(SchemaFetcher schemaFetcher) {
super(MAXGRAPH_MODERN_PROPERTY_RESOURCE);
this.cachedGraphSchemaPair = new CachedMaxGraphMeta(schemaFetcher);
}

@Override
public long getLabelId(String label) {
try {
GraphSchema graphSchema = this.cachedGraphSchemaPair.getGraphSchema();
return graphSchema.getElement(label).getLabelId();
} catch (Exception e) {
logger.error("label " + label + " is invalid, please check schema");
throw new SchemaNotFoundException("label " + label + " is invalid, please check schema");
}
}

@Override
public String getLabel(long labelId) {
GraphSchema graphSchema = this.cachedGraphSchemaPair.getGraphSchema();
return graphSchema.getElement((int) labelId).getLabel();
}

@Override
public long getGlobalId(long labelId, long propertyId) {
throw new UnsupportedOperationException();
}

@Override
public int getPropertyId(String propertyName) {
try {
GraphSchema graphSchema = this.cachedGraphSchemaPair.getGraphSchema();
return graphSchema.getPropertyId(propertyName);
} catch (Exception e) {
logger.error("property " + propertyName + " is invalid, please check schema");
throw new SchemaNotFoundException("property " + propertyName + " is invalid, please check schema");
}
}

@Override
public String getPropertyName(int propertyId) {
GraphSchema graphSchema = this.cachedGraphSchemaPair.getGraphSchema();
return graphSchema.getPropertyName(propertyId);
}

@Override
public long getSnapShotId() {
return this.cachedGraphSchemaPair.getSnapshotId();
}

@Override
public synchronized void updateSnapShotId() {
this.cachedGraphSchemaPair.update();
}
}
2 changes: 2 additions & 0 deletions interactive_engine/src/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
<module>executor</module>
<module>v2</module>
<module>data_load_tools</module>
<module>../../research/gaia/gremlin/compiler</module>
<module>gaia-adaptor</module>
</modules>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public interface GaiaConfig {
int DEFAULT_PEGASUS_WORKER_NUM = 1;
int DEFAULT_PEGASUS_SERVER_NUM = 0;
int DEFAULT_PEGASUS_TIMEOUT = 60000;
GraphType DEFAULT_GRAPH_TYPE = GraphType.VINEYARD;
GraphType DEFAULT_GRAPH_TYPE = GraphType.MAXGRAPH;
String DEFAULT_SCHEMA_PATH = ".";
boolean DEFAULT_OPT_FLAG = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.alibaba.graphscope.gaia.plan;

import com.alibaba.graphscope.common.proto.Common;
import com.alibaba.graphscope.common.proto.Gremlin;
import com.alibaba.graphscope.gaia.idmaker.IdMaker;
import com.alibaba.graphscope.gaia.plan.extractor.PropertyExtractor;
Expand All @@ -23,6 +24,7 @@
import com.alibaba.graphscope.gaia.plan.strategy.global.TransformTraverserStep;
import com.alibaba.graphscope.gaia.FilterHelper;
import com.alibaba.graphscope.gaia.plan.strategy.global.property.cache.ToFetchProperties;
import com.alibaba.graphscope.gaia.plan.translator.builder.PlanConfig;
import com.alibaba.pegasus.builder.JobBuilder;
import com.alibaba.pegasus.builder.ReduceBuilder;
import com.alibaba.graphscope.gaia.plan.extractor.TagKeyExtractorFactory;
Expand All @@ -36,6 +38,7 @@
import com.alibaba.graphscope.gaia.plan.translator.builder.StepBuilder;
import com.alibaba.graphscope.gaia.plan.translator.builder.TraversalBuilder;
import com.google.protobuf.ByteString;
import com.google.protobuf.Option;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
Expand Down Expand Up @@ -129,16 +132,21 @@ protected Object getStepResource(Step t, Configuration conf) {
stepPlanMap.put(STEP.CachePropGaiaGraphStep, new GremlinStepResource() {
@Override
protected Object getStepResource(Step t, Configuration conf) {
Gremlin.QueryParams params = StoreParamsBuider.newBuilder()
StoreParamsBuider paramsBuider = StoreParamsBuider.newBuilder()
.setGraphLabels(((CachePropGaiaGraphStep) t).getGraphLabels())
.setPredicates(new PredicateTranslator(new HasContainerP((CachePropGaiaGraphStep) t)).translate())
.setRequiredProperties(((CachePropGaiaGraphStep) t).cacheProperties())
.build();
.setRequiredProperties(((CachePropGaiaGraphStep) t).cacheProperties());
// set snapshot id if present
Long snapshotId = (Long) conf.getProperty(PlanConfig.SNAPSHOT_ID);
if (snapshotId != null) {
Common.Value value = Common.Value.newBuilder().setI64(snapshotId).build();
paramsBuider.setExtraParams(Collections.singletonMap(PlanConfig.SNAPSHOT_ID, value));
}
return Gremlin.GraphStep.newBuilder()
.addAllIds(PlanUtils.extractIds(((CachePropGaiaGraphStep) t).getIds()))
.setReturnType(((GraphStep) t).returnsVertex() ? Gremlin.EntityType.VERTEX : Gremlin.EntityType.EDGE)
.addTraverserRequirements(Gremlin.TraverserRequirement.valueOf(((CachePropGaiaGraphStep) t).getTraverserRequirement().name()))
.setQueryParams(params)
.setQueryParams(paramsBuider.build())
.build();
}
});
Expand Down Expand Up @@ -209,14 +217,19 @@ protected Object getStepResource(Step t, Configuration conf) {
stepPlanMap.put(STEP.CachePropVertexStep, new GremlinStepResource() {
@Override
protected Object getStepResource(Step t, Configuration conf) {
Gremlin.QueryParams params = StoreParamsBuider.newBuilder()
StoreParamsBuider paramsBuider = StoreParamsBuider.newBuilder()
.setGraphLabels(Arrays.asList(((CachePropVertexStep) t).getEdgeLabels()))
.setRequiredProperties(((CachePropVertexStep) t).cacheProperties())
.build();
.setRequiredProperties(((CachePropVertexStep) t).cacheProperties());
// set snapshot id if present
Long snapshotId = (Long) conf.getProperty(PlanConfig.SNAPSHOT_ID);
if (snapshotId != null) {
Common.Value value = Common.Value.newBuilder().setI64(snapshotId).build();
paramsBuider.setExtraParams(Collections.singletonMap(PlanConfig.SNAPSHOT_ID, value));
}
return Gremlin.VertexStep.newBuilder()
.setReturnType(((CachePropVertexStep) t).returnsVertex() ? Gremlin.EntityType.VERTEX : Gremlin.EntityType.EDGE)
.setDirection(Gremlin.Direction.valueOf(((CachePropVertexStep) t).getDirection().name()))
.setQueryParams(params)
.setQueryParams(paramsBuider.build())
.build();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void apply(Traversal.Admin<?, ?> traversal) {
}
GraphType graphType = config.getGraphType();
// property string -> property id
if (graphType == GraphType.VINEYARD) {
if (graphType == GraphType.MAXGRAPH) {
for (int i = 0; i < steps.size(); ++i) {
Step step = steps.get(i);
if (step instanceof HasContainerHolder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ public class PlanConfig {
public static String QUERY_ID = "query_id";
public static String TAG_ID_MAKER = "tag_id_maker";
public static String QUERY_CONFIG = "query_config";
public static String SNAPSHOT_ID = "snapshot_id";
// This snapshot id is aligned with the runtime to represent the extra parameter of snapshot id while accessing MaxGraphStore
public static String SNAPSHOT_ID = "SID";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.graphscope.gaia.result.DefaultResultParser;
import com.alibaba.graphscope.gaia.result.GremlinResultProcessor;
import com.alibaba.graphscope.gaia.store.GraphStoreService;
import com.alibaba.graphscope.gaia.store.GraphType;
import com.alibaba.pegasus.builder.AbstractBuilder;
import com.alibaba.graphscope.gaia.broadcast.AbstractBroadcastProcessor;
import com.alibaba.graphscope.gaia.broadcast.RpcBroadcastProcessor;
Expand Down Expand Up @@ -59,6 +60,9 @@ protected GremlinExecutor.LifeCycle createLifeCycle(Context ctx, Supplier<Gremli
final Map<String, Object> args = msg.getArgs();
final long seto = args.containsKey(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT) ?
((Number) args.get(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT)).longValue() : settings.scriptEvaluationTimeout;
if (config.getGraphType() == GraphType.MAXGRAPH) {
graphStore.updateSnapShotId();
}
return GremlinExecutor.LifeCycle.build()
.scriptEvaluationTimeoutOverride(seto)
.beforeEval(b -> {
Expand All @@ -81,6 +85,9 @@ protected GremlinExecutor.LifeCycle createLifeCycle(Context ctx, Supplier<Gremli
.addConfig(PlanConfig.QUERY_ID, queryId)
.addConfig(PlanConfig.TAG_ID_MAKER, new TagIdMaker((Traversal.Admin) o))
.addConfig(PlanConfig.QUERY_CONFIG, PlanUtils.getDefaultConfig(queryId, config));
if (config.getGraphType() == GraphType.MAXGRAPH) {
traversalBuilder.addConfig(PlanConfig.SNAPSHOT_ID, Long.valueOf(graphStore.getSnapShotId()));
}
AbstractBuilder jobReqBuilder = new TraversalTranslator(traversalBuilder).translate();
PlanUtils.print(jobReqBuilder);
broadcastProcessor.broadcast(jobReqBuilder.build(), new GremlinResultProcessor(ctx, new DefaultResultParser(traversalBuilder, graphStore)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.graphscope.gaia.idmaker.TagIdMaker;
import com.alibaba.graphscope.gaia.plan.PlanUtils;
import com.alibaba.graphscope.gaia.store.GraphStoreService;
import com.alibaba.graphscope.gaia.store.GraphType;
import com.alibaba.pegasus.builder.AbstractBuilder;
import com.alibaba.graphscope.gaia.plan.translator.TraversalTranslator;
import com.alibaba.graphscope.gaia.plan.translator.builder.PlanConfig;
Expand Down Expand Up @@ -52,6 +53,9 @@ protected GremlinExecutor.LifeCycle createLifeCycle(Context ctx, Supplier<Gremli
final Map<String, Object> args = msg.getArgs();
final long seto = args.containsKey(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT) ?
((Number) args.get(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT)).longValue() : settings.scriptEvaluationTimeout;
if (config.getGraphType() == GraphType.MAXGRAPH) {
graphStore.updateSnapShotId();
}
return GremlinExecutor.LifeCycle.build()
.scriptEvaluationTimeoutOverride(seto)
.beforeEval(b -> {
Expand All @@ -70,11 +74,14 @@ protected GremlinExecutor.LifeCycle createLifeCycle(Context ctx, Supplier<Gremli
.withResult(o -> {
if (o != null && o instanceof Traversal) {
long queryId = (long) queryIdMaker.getId(o);
AbstractBuilder jobReqBuilder = new TraversalTranslator((new TraversalBuilder((Traversal.Admin) o))
TraversalBuilder traversalBuilder = new TraversalBuilder((Traversal.Admin) o)
.addConfig(PlanConfig.QUERY_ID, queryId)
.addConfig(PlanConfig.TAG_ID_MAKER, new TagIdMaker((Traversal.Admin) o))
.addConfig(PlanConfig.QUERY_CONFIG, PlanUtils.getDefaultConfig(queryId, config)))
.translate();
.addConfig(PlanConfig.QUERY_CONFIG, PlanUtils.getDefaultConfig(queryId, config));
if (config.getGraphType() == GraphType.MAXGRAPH) {
traversalBuilder.addConfig(PlanConfig.SNAPSHOT_ID, Long.valueOf(graphStore.getSnapShotId()));
}
AbstractBuilder jobReqBuilder = new TraversalTranslator(traversalBuilder).translate();
String content = new String(jobReqBuilder.build().toByteArray(), StandardCharsets.ISO_8859_1);
AbstractGraphOpProcessor.writeResultList(ctx, Arrays.asList(content), ResponseStatusCode.SUCCESS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import com.alibaba.graphscope.gaia.result.GremlinResultProcessor;
import com.alibaba.graphscope.gaia.result.RemoteTraverserResultParser;
import com.alibaba.graphscope.gaia.store.GraphStoreService;
import com.alibaba.graphscope.gaia.store.GraphType;
import com.alibaba.pegasus.builder.AbstractBuilder;
import org.apache.commons.io.FileUtils;
import org.apache.tinkerpop.gremlin.driver.Tokens;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
Expand All @@ -29,8 +29,6 @@
import org.slf4j.LoggerFactory;

import javax.script.SimpleBindings;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;

Expand Down Expand Up @@ -63,6 +61,9 @@ public ThrowingConsumer<Context> select(Context ctx) throws OpProcessorException
final Map<String, String> aliases = (Map<String, String>) message.optionalArgs(Tokens.ARGS_ALIASES).get();
final String traversalSourceName = aliases.entrySet().iterator().next().getValue();
logger.info("tokens ops is {}", message.getOp());
if (config.getGraphType() == GraphType.MAXGRAPH) {
graphStore.updateSnapShotId();
}
switch (message.getOp()) {
case Tokens.OPS_BYTECODE:
op = (context -> {
Expand All @@ -74,8 +75,10 @@ public ThrowingConsumer<Context> select(Context ctx) throws OpProcessorException
.addConfig(PlanConfig.QUERY_ID, queryId)
.addConfig(PlanConfig.TAG_ID_MAKER, new TagIdMaker((Traversal.Admin) traversal))
.addConfig(PlanConfig.QUERY_CONFIG, PlanUtils.getDefaultConfig(queryId, config));
if (config.getGraphType() == GraphType.MAXGRAPH) {
traversalBuilder.addConfig(PlanConfig.SNAPSHOT_ID, Long.valueOf(graphStore.getSnapShotId()));
}
AbstractBuilder jobReqBuilder = new TraversalTranslator(traversalBuilder).translate();
FileUtils.writeStringToFile(new File("plan.log"), String.format("query-%d", queryId), StandardCharsets.UTF_8, true);
PlanUtils.print(jobReqBuilder);
broadcastProcessor.broadcast(jobReqBuilder.build(),
new GremlinResultProcessor(ctx, new RemoteTraverserResultParser(traversalBuilder, graphStore)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public long getSnapShotId() {
throw new UnsupportedOperationException();
}

public void updateSnapShotId() {
throw new UnsupportedOperationException();
}

public <P> Optional<P> getVertexProperty(BigInteger id, String key) {
String idStr = String.valueOf(id);
if (getVertexKeys(id).isEmpty()) return Optional.empty();
Expand Down

0 comments on commit ca65eda

Please sign in to comment.