Skip to content
Permalink
Browse files
support olap writeback for cassandra and rocksdb (#1506)
* support olap writeback for cassandra and rocksdb
* add olap property key tests
* clear vertex cache when olap pk changed
* support config data disk for olap pk when using rocksdb backend
* rename read frequency to write type

Change-Id: I947c3a96eb7aa67ee9eff5f7fb248e9f4539e91b
  • Loading branch information
zhoney committed Jul 22, 2021
1 parent 379859a commit ef37b3ac96b1fc8e82bca487b074abf79bba846d
Showing 75 changed files with 2,500 additions and 223 deletions.
@@ -153,7 +153,7 @@
</addDefaultSpecificationEntries>
</manifest>
<manifestEntries>
<Implementation-Version>0.64.0.0</Implementation-Version>
<Implementation-Version>0.65.0.0</Implementation-Version>
</manifestEntries>
</archive>
</configuration>
@@ -57,6 +57,7 @@ public class API {

public static final String ACTION_APPEND = "append";
public static final String ACTION_ELIMINATE = "eliminate";
public static final String ACTION_CLEAR = "clear";

private static final Meter succeedMeter =
MetricsUtil.registerMeter(API.class, "commit-succeed");
@@ -423,8 +423,6 @@ private static class JsonVertex extends JsonElement {

@Override
public void checkCreate(boolean isBatch) {
E.checkArgumentNotNull(this.label,
"The label of vertex can't be null");
this.checkUpdate();
}

@@ -446,8 +444,10 @@ public void checkUpdate() {
public Object[] properties() {
Object[] props = API.properties(this.properties);
List<Object> list = new ArrayList<>(Arrays.asList(props));
list.add(T.label);
list.add(this.label);
if (this.label != null) {
list.add(T.label);
list.add(this.label);
}
if (this.id != null) {
list.add(T.id);
list.add(this.id);
@@ -222,11 +222,11 @@ public Map<String, GraphReadMode> graphReadMode(
@Context GraphManager manager,
@PathParam("name") String name,
GraphReadMode readMode) {
LOG.debug("Set graph read mode to: '{}' of graph '{}'",
LOG.debug("Set graph-read-mode to: '{}' of graph '{}'",
readMode, name);

E.checkArgument(readMode != null,
"Graph read mode can't be null");
"Graph-read-mode can't be null");
HugeGraph g = graph(manager, name);
g.readMode(readMode);
return ImmutableMap.of("graph_read_mode", readMode);
@@ -241,7 +241,7 @@ public Map<String, GraphReadMode> graphReadMode(
public Map<String, GraphReadMode> graphReadMode(
@Context GraphManager manager,
@PathParam("name") String name) {
LOG.debug("Get graph read mode of graph '{}'", name);
LOG.debug("Get graph-read-mode of graph '{}'", name);

HugeGraph g = graph(manager, name);
return ImmutableMap.of("graph_read_mode", g.readMode());
@@ -46,6 +46,7 @@
import com.baidu.hugegraph.core.GraphManager;
import com.baidu.hugegraph.define.Checkable;
import com.baidu.hugegraph.schema.IndexLabel;
import com.baidu.hugegraph.schema.SchemaElement;
import com.baidu.hugegraph.schema.Userdata;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.GraphMode;
@@ -77,9 +78,9 @@ public String create(@Context GraphManager manager,

HugeGraph g = graph(manager, graph);
IndexLabel.Builder builder = jsonIndexLabel.convert2Builder(g);
IndexLabel.CreatedIndexLabel il = builder.createWithTask();
SchemaElement.TaskWithSchema il = builder.createWithTask();
il.indexLabel(mapIndexLabel(il.indexLabel()));
return manager.serializer(g).writeCreatedIndexLabel(il);
return manager.serializer(g).writeTaskWithSchema(il);
}

@PUT
@@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.annotation.security.RolesAllowed;
import javax.inject.Singleton;
@@ -41,20 +42,24 @@
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.api.API;
import com.baidu.hugegraph.api.filter.StatusFilter.Status;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.id.IdGenerator;
import com.baidu.hugegraph.core.GraphManager;
import com.baidu.hugegraph.define.Checkable;
import com.baidu.hugegraph.schema.PropertyKey;
import com.baidu.hugegraph.schema.SchemaElement;
import com.baidu.hugegraph.schema.Userdata;
import com.baidu.hugegraph.type.define.AggregateType;
import com.baidu.hugegraph.type.define.Cardinality;
import com.baidu.hugegraph.type.define.DataType;
import com.baidu.hugegraph.type.define.GraphMode;
import com.baidu.hugegraph.type.define.ReadFrequency;
import com.baidu.hugegraph.type.define.WriteType;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;

@Path("graphs/{graph}/schema/propertykeys")
@Singleton
@@ -77,8 +82,8 @@ public String create(@Context GraphManager manager,

HugeGraph g = graph(manager, graph);
PropertyKey.Builder builder = jsonPropertyKey.convert2Builder(g);
PropertyKey propertyKey = builder.create();
return manager.serializer(g).writePropertyKey(propertyKey);
SchemaElement.TaskWithSchema pk = builder.createWithTask();
return manager.serializer(g).writeTaskWithSchema(pk);
}

@PUT
@@ -98,15 +103,29 @@ public String update(@Context GraphManager manager,
E.checkArgument(name.equals(jsonPropertyKey.name),
"The name in url(%s) and body(%s) are different",
name, jsonPropertyKey.name);

HugeGraph g = graph(manager, graph);
if (ACTION_CLEAR.equals(action)) {
PropertyKey propertyKey = g.propertyKey(name);
E.checkArgument(propertyKey.olap(),
"Only olap property key can do action clear, " +
"but got '%s'", propertyKey);
Id id = g.clearPropertyKey(propertyKey);
SchemaElement.TaskWithSchema pk =
new SchemaElement.TaskWithSchema(propertyKey, id);
return manager.serializer(g).writeTaskWithSchema(pk);
}

// Parse action parameter
boolean append = checkAndParseAction(action);

HugeGraph g = graph(manager, graph);
PropertyKey.Builder builder = jsonPropertyKey.convert2Builder(g);
PropertyKey propertyKey = append ?
builder.append() :
builder.eliminate();
return manager.serializer(g).writePropertyKey(propertyKey);
SchemaElement.TaskWithSchema pk =
new SchemaElement.TaskWithSchema(propertyKey, IdGenerator.ZERO);
return manager.serializer(g).writeTaskWithSchema(pk);
}

@GET
@@ -153,18 +172,21 @@ public String get(@Context GraphManager manager,

@DELETE
@Timed
@Status(Status.ACCEPTED)
@Path("{name}")
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON_WITH_CHARSET)
@RolesAllowed({"admin", "$owner=$graph $action=property_key_delete"})
public void delete(@Context GraphManager manager,
@PathParam("graph") String graph,
@PathParam("name") String name) {
public Map<String, Id> delete(@Context GraphManager manager,
@PathParam("graph") String graph,
@PathParam("name") String name) {
LOG.debug("Graph [{}] remove property key by name '{}'", graph, name);

HugeGraph g = graph(manager, graph);
// Throw 404 if not exists
g.schema().getPropertyKey(name);
g.schema().propertyKey(name).remove();
return ImmutableMap.of("task_id",
g.schema().propertyKey(name).remove());
}

/**
@@ -183,8 +205,8 @@ private static class JsonPropertyKey implements Checkable {
public DataType dataType;
@JsonProperty("aggregate_type")
public AggregateType aggregateType;
@JsonProperty("read_frequency")
public ReadFrequency readFrequency;
@JsonProperty("write_type")
public WriteType writeType;
@JsonProperty("properties")
public String[] properties;
@JsonProperty("user_data")
@@ -224,8 +246,8 @@ private PropertyKey.Builder convert2Builder(HugeGraph g) {
if (this.aggregateType != null) {
builder.aggregateType(this.aggregateType);
}
if (this.readFrequency != null) {
builder.readFrequency(this.readFrequency);
if (this.writeType != null) {
builder.writeType(this.writeType);
}
if (this.userdata != null) {
builder.userdata(this.userdata);
@@ -240,10 +262,10 @@ private PropertyKey.Builder convert2Builder(HugeGraph g) {
public String toString() {
return String.format("JsonPropertyKey{name=%s, cardinality=%s, " +
"dataType=%s, aggregateType=%s, " +
"readFrequency=%s, properties=%s}",
"writeType=%s, properties=%s}",
this.name, this.cardinality,
this.dataType, this.aggregateType,
this.readFrequency, this.properties);
this.writeType, this.properties);
}
}
}
@@ -184,16 +184,22 @@ public Id getNextId(HugeType type) {
}

@Override
public void addPropertyKey(PropertyKey key) {
public Id addPropertyKey(PropertyKey key) {
verifySchemaPermission(HugePermission.WRITE, key);
this.hugegraph.addPropertyKey(key);
return this.hugegraph.addPropertyKey(key);
}

@Override
public void removePropertyKey(Id key) {
public Id removePropertyKey(Id key) {
PropertyKey pkey = this.hugegraph.propertyKey(key);
verifySchemaPermission(HugePermission.DELETE, pkey);
this.hugegraph.removePropertyKey(key);
return this.hugegraph.removePropertyKey(key);
}

@Override
public Id clearPropertyKey(PropertyKey propertyKey) {
verifySchemaPermission(HugePermission.DELETE, propertyKey);
return this.hugegraph.clearPropertyKey(propertyKey);
}

@Override
@@ -41,6 +41,7 @@
import com.baidu.hugegraph.schema.EdgeLabel;
import com.baidu.hugegraph.schema.IndexLabel;
import com.baidu.hugegraph.schema.PropertyKey;
import com.baidu.hugegraph.schema.SchemaElement;
import com.baidu.hugegraph.schema.VertexLabel;
import com.baidu.hugegraph.traversal.algorithm.CustomizedCrosspointsTraverser.CrosspointsPaths;
import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser.SimilarsMap;
@@ -145,6 +146,32 @@ public String writePropertyKey(PropertyKey propertyKey) {
return JsonUtil.toJson(propertyKey);
}

@Override
public String writeTaskWithSchema(
SchemaElement.TaskWithSchema taskWithSchema) {
StringBuilder builder = new StringBuilder();
long id = taskWithSchema.task() == null ?
0L : taskWithSchema.task().asLong();
SchemaElement schemaElement = taskWithSchema.schemaElement();
String type;
String schema;
if (schemaElement instanceof PropertyKey) {
type = "property_key";
schema = this.writePropertyKey((PropertyKey) schemaElement);
} else if (schemaElement instanceof IndexLabel) {
type = "index_label";
schema = this.writeIndexlabel((IndexLabel) schemaElement);
} else {
throw new HugeException("Invalid schema element '%s' in " +
"TaskWithSchema, only support " +
"[PropertyKey, IndexLabel]", schemaElement);
}
return builder.append("{\"").append(type).append("\": ")
.append(schema)
.append(", \"task_id\": ").append(id).append("}")
.toString();
}

@Override
public String writePropertyKeys(List<PropertyKey> propertyKeys) {
return writeList("propertykeys", propertyKeys);
@@ -180,16 +207,6 @@ public String writeIndexlabels(List<IndexLabel> indexLabels) {
return writeList("indexlabels", indexLabels);
}

@Override
public String writeCreatedIndexLabel(IndexLabel.CreatedIndexLabel cil) {
StringBuilder builder = new StringBuilder();
long id = cil.task() == null ? 0L : cil.task().asLong();
return builder.append("{\"index_label\": ")
.append(this.writeIndexlabel(cil.indexLabel()))
.append(", \"task_id\": ").append(id).append("}")
.toString();
}

@Override
public String writeVertex(Vertex vertex) {
return JsonUtil.toJson(vertex);
@@ -33,6 +33,7 @@
import com.baidu.hugegraph.schema.EdgeLabel;
import com.baidu.hugegraph.schema.IndexLabel;
import com.baidu.hugegraph.schema.PropertyKey;
import com.baidu.hugegraph.schema.SchemaElement;
import com.baidu.hugegraph.schema.VertexLabel;
import com.baidu.hugegraph.traversal.algorithm.CustomizedCrosspointsTraverser.CrosspointsPaths;

@@ -63,7 +64,7 @@ public interface Serializer {

public String writeIndexlabels(List<IndexLabel> indexLabels);

public String writeCreatedIndexLabel(IndexLabel.CreatedIndexLabel cil);
public String writeTaskWithSchema(SchemaElement.TaskWithSchema tws);

public String writeVertex(Vertex v);

@@ -114,10 +114,11 @@ public final class ApiVersion {
* [0.62] Issue-1378: Add compact api for rocksdb/cassandra/hbase backend
* [0.63] Issue-1500: Add user-login RESTful API
* [0.64] Issue-1504: Add auth-project RESTful API
* [0.65] Issue-1506: Support olap property key
*/

// The second parameter of Version.of() is for IDE running without JAR
public static final Version VERSION = Version.of(ApiVersion.class, "0.64");
public static final Version VERSION = Version.of(ApiVersion.class, "0.65");

public static final void check() {
// Check version of hugegraph-core. Firstly do check from version 0.3
@@ -124,6 +124,6 @@ public boolean supportsTtl() {

@Override
public boolean supportsOlapProperties() {
return false;
return true;
}
}
@@ -37,7 +37,9 @@
import com.baidu.hugegraph.schema.PropertyKey;
import com.baidu.hugegraph.schema.SchemaElement;
import com.baidu.hugegraph.structure.HugeElement;
import com.baidu.hugegraph.structure.HugeIndex;
import com.baidu.hugegraph.structure.HugeProperty;
import com.baidu.hugegraph.structure.HugeVertex;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.DataType;
import com.baidu.hugegraph.type.define.HugeKeys;
@@ -58,6 +60,16 @@ protected TableBackendEntry newBackendEntry(TableBackendEntry.Row row) {
return new CassandraBackendEntry(row);
}

@Override
protected TableBackendEntry newBackendEntry(HugeIndex index) {
TableBackendEntry backendEntry = newBackendEntry(index.type(),
index.id());
if (index.indexLabel().olap()) {
backendEntry.olap(true);
}
return backendEntry;
}

@Override
protected CassandraBackendEntry convertEntry(BackendEntry backendEntry) {
if (!(backendEntry instanceof CassandraBackendEntry)) {
@@ -130,6 +142,21 @@ protected void parseProperties(HugeElement element,
}
}

@Override
public BackendEntry writeOlapVertex(HugeVertex vertex) {
CassandraBackendEntry entry = newBackendEntry(HugeType.OLAP,
vertex.id());
entry.column(HugeKeys.ID, this.writeId(vertex.id()));
HugeProperty<?> prop = vertex.getProperties().values()
.iterator().next();
PropertyKey pk = prop.propertyKey();
entry.subId(pk.id());
entry.column(HugeKeys.PROPERTY_VALUE,
this.writeProperty(pk, prop.value()));
entry.olap(true);
return entry;
}

@Override
protected Object writeProperty(PropertyKey propertyKey, Object value) {
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_PROPERTY);

0 comments on commit ef37b3a

Please sign in to comment.