Skip to content
Permalink
Browse files
Let Sender and Receiver process compute message with type MESSAGE_CLA…
…SS (#63)

* delete ValueFactory
* rename VALUE_CLASS to ALGORITHM_RESULT_CLASS, remove VALUE_TYPE
* add option OUTPUT_VALUE_NAME
* check channel not null in QueuedMessageSender#init
  • Loading branch information
houzhizhen committed Jun 22, 2021
1 parent ea4c743 commit 429e081006545f53d4d5e5ce219e23c24c44be2a
Showing 50 changed files with 441 additions and 553 deletions.
@@ -20,36 +20,30 @@
package com.baidu.hugegraph.computer.core.common;

import com.baidu.hugegraph.computer.core.allocator.Allocator;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.GraphFactory;
import com.baidu.hugegraph.computer.core.graph.value.ValueFactory;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.computer.core.config.Config;

public final class ComputerContext {

private static volatile ComputerContext INSTANCE;

private final Config config;
private final GraphFactory graphFactory;
private final ValueFactory valueFactory;
private final Allocator allocator;

private ComputerContext(Config config,
GraphFactory graphFactory,
ValueFactory valueFactory,
Allocator allocator) {
this.config = config;
this.graphFactory = graphFactory;
this.valueFactory = valueFactory;
this.allocator = allocator;
}

public static synchronized void initContext(Config config,
GraphFactory graphFactory,
ValueFactory valueFactory,
Allocator allocator) {
INSTANCE = new ComputerContext(config, graphFactory,
valueFactory, allocator);
INSTANCE = new ComputerContext(config, graphFactory, allocator);
}

public static ComputerContext instance() {
@@ -65,10 +59,6 @@ public GraphFactory graphFactory() {
return this.graphFactory;
}

public ValueFactory valueFactory() {
return this.valueFactory;
}

public Allocator allocator() {
return this.allocator;
}
@@ -19,7 +19,6 @@

package com.baidu.hugegraph.computer.core.config;

import com.baidu.hugegraph.computer.core.graph.value.ValueType;
import com.baidu.hugegraph.config.ConfigOption;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.config.TypedOption;
@@ -45,10 +44,6 @@ public interface Config {
<T> T createObject(ConfigOption<Class<?>> clazzOption,
boolean requiredNotNull);

String vertexValueName();

ValueType valueType();

Boolean outputVertexAdjacentEdges();

Boolean outputVertexProperties();
@@ -28,6 +28,7 @@
import com.baidu.hugegraph.computer.core.graph.id.Id;
import com.baidu.hugegraph.computer.core.graph.properties.Properties;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.graph.value.ValueType;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;

public interface GraphFactory {
@@ -61,4 +62,8 @@ public interface GraphFactory {
<V> List<V> createList(int capacity);

<K, V> Map<K, V> createMap();

Value<?> createValue(byte code);

Value<?> createValue(ValueType type);
}
@@ -38,8 +38,6 @@
// TODO: try to reduce call ComputerContext.instance() directly.
private static final GraphFactory GRAPH_FACTORY =
ComputerContext.instance().graphFactory();
private static final ValueFactory VALUE_FACTORY =
ComputerContext.instance().valueFactory();

private ValueType elemType;
private List<T> values;
@@ -142,7 +140,7 @@ protected void read(RandomAccessInput in, boolean readElemType)

for (int i = 0; i < size; i++) {
@SuppressWarnings("unchecked")
T value = (T) VALUE_FACTORY.createValue(this.elemType);
T value = (T) GRAPH_FACTORY.createValue(this.elemType);
value.read(in);
this.values.add(value);
}

This file was deleted.

@@ -34,8 +34,7 @@ public enum ValueType implements SerialEnum {
ID_VALUE(20, -1, "id"),
ID_VALUE_LIST(30, -1, "idlist"),
ID_VALUE_LIST_LIST(40, -1, "idlistlist"),
LIST_VALUE(80, -1, "list"),
CUSTOM_VALUE(100, -1, "custom");
LIST_VALUE(80, -1, "list");

private final byte code;
// Length in bytes if it's a fixed value type, -1 means not fixed.
@@ -35,7 +35,5 @@ public interface GraphComputeInput extends GraphInput {

Pair<Id, Value<?>> readMessage() throws IOException;

Id readId(RandomAccessInput in) throws IOException;

Value<?> readValue(RandomAccessInput in) throws IOException;
}
@@ -171,7 +171,7 @@ public void repair(ComputerContext context) {

private V newValue(ComputerContext context) {
@SuppressWarnings("unchecked")
V val = (V) context.valueFactory().createValue(this.type);
V val = (V) context.graphFactory().createValue(this.type);
return val;
}
}
@@ -59,32 +59,21 @@ public static synchronized ComputerOptions instance() {
return INSTANCE;
}

public static final ConfigOption<String> VALUE_TYPE =
public static final ConfigOption<Class<?>> ALGORITHM_RESULT_CLASS =
new ConfigOption<>(
"algorithm.value_type",
"The value type of current algorithm, used by " +
"ValueFactory to create value. If value_type is " +
"CUSTOM_VALUE, value_class is used to create value.",
disallowEmpty(),
"NULL"
);

public static final ConfigOption<Class<?>> VALUE_CLASS =
new ConfigOption<>(
"algorithm.value_class",
"The class of user defined value, the value is used as " +
"the algorithm result. Used when value_type is " +
"CUSTOM_VALUE.",
"algorithm.result_class",
"The class of vertex's value, the instance is used to " +
"store computation result for the vertex.",
disallowEmpty(),
Null.class
);

public static final ConfigOption<String> VALUE_NAME =
public static final ConfigOption<Class<?>> ALGORITHM_MESSAGE_CLASS =
new ConfigOption<>(
"algorithm.value_name",
"The algorithm value name of vertex",
"algorithm.message_class",
"The class of message passed when compute vertex.",
disallowEmpty(),
"value"
Null.class
);

public static final ConfigOption<String> INPUT_SOURCE_TYPE =
@@ -176,6 +165,15 @@ public static synchronized ComputerOptions instance() {
4
);

public static final ConfigOption<String> OUTPUT_RESULT_NAME =
new ConfigOption<>(
"output.result_name",
"The value is assigned dynamically by #name() of " +
"instance created by WORKER_COMPUTATION_CLASS.",
disallowEmpty(),
"value"
);

public static final ConfigOption<Boolean> OUTPUT_WITH_ADJACENT_EDGES =
new ConfigOption<>(
"output.with_adjacent_edges",
@@ -219,8 +217,6 @@ public static synchronized ComputerOptions instance() {
);

public static Set<String> REQUIRED_OPTIONS = ImmutableSet.of(
VALUE_TYPE.name(),
VALUE_NAME.name()
);

public static final ConfigOption<String> JOB_ID =
@@ -25,7 +25,6 @@
import org.apache.commons.configuration.MapConfiguration;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.graph.value.ValueType;
import com.baidu.hugegraph.config.ConfigOption;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.config.TypedOption;
@@ -58,10 +57,6 @@ private HugeConfig parseOptions(Map<String, String> options) {
private HotConfig extractHotConfig(HugeConfig allConfig) {
// Populate high frequency accessed options into HotConfig
HotConfig hotConfig = new HotConfig();
hotConfig.vertexValueName(
allConfig.get(ComputerOptions.VALUE_NAME));
hotConfig.valueType(ValueType.valueOf(
allConfig.get(ComputerOptions.VALUE_TYPE)));

hotConfig.outputVertexAdjacentEdges(
allConfig.get(ComputerOptions.OUTPUT_WITH_ADJACENT_EDGES));
@@ -169,7 +164,7 @@ public String getString(String key, String defaultValue) {
*/
@Override
public <T> T createObject(ConfigOption<Class<?>> clazzOption) {
return createObject(clazzOption, true);
return this.createObject(clazzOption, true);
}

@Override
@@ -194,16 +189,6 @@ public <T> T createObject(ConfigOption<Class<?>> clazzOption,
}
}

@Override
public String vertexValueName() {
return this.hotConfig.vertexValueName();
}

@Override
public ValueType valueType() {
return this.hotConfig.valueType();
}

@Override
public Boolean outputVertexAdjacentEdges() {
return this.hotConfig.outputVertexAdjacentEdges();
@@ -19,33 +19,12 @@

package com.baidu.hugegraph.computer.core.config;

import com.baidu.hugegraph.computer.core.graph.value.ValueType;

public final class HotConfig {

private String vertexValueName;
private ValueType valueType;

private boolean outputVertexAdjacentEdges;
private boolean outputVertexProperties;
private boolean outputEdgeProperties;

public String vertexValueName() {
return this.vertexValueName;
}

public void vertexValueName(String vertexValueName) {
this.vertexValueName = vertexValueName;
}

public ValueType valueType() {
return this.valueType;
}

public void valueType(ValueType valueType) {
this.valueType = valueType;
}

public boolean outputVertexAdjacentEdges() {
return this.outputVertexAdjacentEdges;
}
@@ -26,6 +26,8 @@
import java.util.UUID;

import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.common.SerialEnum;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.edge.DefaultEdge;
@@ -38,19 +40,27 @@
import com.baidu.hugegraph.computer.core.graph.id.UuidId;
import com.baidu.hugegraph.computer.core.graph.properties.DefaultProperties;
import com.baidu.hugegraph.computer.core.graph.properties.Properties;
import com.baidu.hugegraph.computer.core.graph.value.BooleanValue;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.graph.value.FloatValue;
import com.baidu.hugegraph.computer.core.graph.value.IdValue;
import com.baidu.hugegraph.computer.core.graph.value.IdValueList;
import com.baidu.hugegraph.computer.core.graph.value.IdValueListList;
import com.baidu.hugegraph.computer.core.graph.value.IntValue;
import com.baidu.hugegraph.computer.core.graph.value.ListValue;
import com.baidu.hugegraph.computer.core.graph.value.LongValue;
import com.baidu.hugegraph.computer.core.graph.value.NullValue;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.graph.value.ValueFactory;
import com.baidu.hugegraph.computer.core.graph.value.ValueType;
import com.baidu.hugegraph.computer.core.graph.vertex.DefaultVertex;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;

public final class BuiltinGraphFactory implements GraphFactory {

private final Config config;
private ValueFactory valueFactory;

public BuiltinGraphFactory(Config config, ValueFactory valueFactory) {
public BuiltinGraphFactory(Config config) {
this.config = config;
this.valueFactory = valueFactory;
}

@Override
@@ -128,6 +138,42 @@ public <K, V> Map<K, V> createMap() {

@Override
public Properties createProperties() {
return new DefaultProperties(this, this.valueFactory);
return new DefaultProperties(this);
}

public Value<?> createValue(byte code) {
ValueType type = SerialEnum.fromCode(ValueType.class, code);
return createValue(type);
}

/**
* Create property value by type.
*/
public Value<?> createValue(ValueType type) {
switch (type) {
case NULL:
return NullValue.get();
case BOOLEAN:
return new BooleanValue();
case INT:
return new IntValue();
case LONG:
return new LongValue();
case FLOAT:
return new FloatValue();
case DOUBLE:
return new DoubleValue();
case ID_VALUE:
return new IdValue();
case ID_VALUE_LIST:
return new IdValueList();
case ID_VALUE_LIST_LIST:
return new IdValueListList();
case LIST_VALUE:
return new ListValue<>();
default:
throw new ComputerException("Can't create Value for %s",
type.name());
}
}
}

0 comments on commit 429e081

Please sign in to comment.