Skip to content
Permalink
Browse files
add rpc support (#36)
* add rpc server and client: MasterRpcManager,WorkerRpcManager
* add MasterInputManager and WorkerInputManager
* inject rpc service into GraphFetcher
* store managers through map instead of list
* add loadGraph() and mergeGraph() to WorkerInputManager
* add checkInited() for WorkerService and MasterService
  • Loading branch information
javeme committed Apr 14, 2021
1 parent 7f3b750 commit 10b347e8129eb70b713c25c51a2c3ad6114635a4
Showing 29 changed files with 730 additions and 123 deletions.
@@ -16,6 +16,10 @@
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-common</artifactId>
</dependency>
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-rpc</artifactId>
</dependency>
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-client</artifactId>
@@ -19,11 +19,27 @@

package com.baidu.hugegraph.computer.core.aggregator;

import com.baidu.hugegraph.computer.core.worker.Manager;
import com.baidu.hugegraph.computer.core.manager.Manager;
import com.baidu.hugegraph.computer.core.rpc.AggregateRpcService;

/**
* Aggregator manager manages aggregators in master.
* TODO: implement
*/
public class MasterAggrManager implements Manager {

public static final String NAME = "master_aggr";

@Override
public String name() {
return NAME;
}

public AggregateRpcService handler() {
//E.checkNotNull(handler, "handler");
// TODO: implement
return new AggregateRpcService() {
// mock
};
}
}
@@ -19,11 +19,23 @@

package com.baidu.hugegraph.computer.core.aggregator;

import com.baidu.hugegraph.computer.core.worker.Manager;
import com.baidu.hugegraph.computer.core.manager.Manager;
import com.baidu.hugegraph.computer.core.rpc.AggregateRpcService;

/**
* Aggregator manager manages aggregators in worker.
* TODO: implement
*/
public class WorkerAggrManager implements Manager {

public static final String NAME = "worker_aggr";

@Override
public String name() {
return NAME;
}

public void service(AggregateRpcService service) {
// TODO: implement
}
}
@@ -30,8 +30,10 @@

public class ContainerInfo implements Readable, Writable {

public static final int MASTER_ID = -1;

/*
* There is only 1 master, so the id of master is no use.
* There is only 1 master, and the id of master is -1.
* The id of workers start from 0. The id is used to identify a worker.
*/
private int id;
@@ -30,7 +30,6 @@
import com.baidu.hugegraph.computer.core.aggregator.WorkerAggrManager;
import com.baidu.hugegraph.computer.core.graph.partition.HashPartitioner;
import com.baidu.hugegraph.computer.core.master.DefaultMasterComputation;
import com.baidu.hugegraph.computer.core.worker.Computation;
import com.baidu.hugegraph.config.ConfigOption;
import com.baidu.hugegraph.config.OptionHolder;
import com.google.common.collect.ImmutableSet;
@@ -278,7 +277,7 @@ public static synchronized ComputerOptions instance() {
"worker-computation is used to compute each vertex " +
"in each superstep.",
disallowEmpty(),
Computation.class
Null.class
);

public static final ConfigOption<Class<?>> WORKER_COMBINER_CLASS =
@@ -79,6 +79,10 @@ private HotConfig extractHotConfig(HugeConfig allConfig) {
return hotConfig;
}

public HugeConfig hugeConfig() {
return this.allConfig;
}

public <R> R get(TypedOption<?, R> option) {
return this.allConfig.get(option);
}
@@ -148,12 +152,21 @@ public String getString(String key, String defaultValue) {
}

/**
* Create object by class option. It throws ComputerException if failed
* to create object.
* Create object by class option.
* It throws ComputerException if failed to create object.
*/
public <T> T createObject(ConfigOption<Class<?>> clazzOption) {
return createObject(clazzOption, true);
}

public <T> T createObject(ConfigOption<Class<?>> clazzOption,
boolean requiredNotNull) {
Class<?> clazz = this.get(clazzOption);
if (clazz == Null.class) {
if (requiredNotNull) {
throw new ComputerException(
"Please config required option '%s'", clazzOption.name());
}
return null;
}
try {
@@ -21,9 +21,13 @@

public interface GraphFetcher {

void close();
InputSplit nextVertexInputSplit();

InputSplit nextEdgeInputSplit();

VertexFetcher vertexFetcher();

EdgeFetcher edgeFetcher();

void close();
}
@@ -24,6 +24,7 @@
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.input.hg.HugeGraphFetcher;
import com.baidu.hugegraph.computer.core.input.hg.HugeInputSplitFetcher;
import com.baidu.hugegraph.computer.core.rpc.InputSplitRpcService;

public class InputSourceFactory {

@@ -37,11 +38,12 @@ public static InputSplitFetcher createInputSplitFetcher(Config config) {
}
}

public static GraphFetcher createGraphFetcher(Config config) {
public static GraphFetcher createGraphFetcher(Config config,
InputSplitRpcService srv) {
String type = config.get(ComputerOptions.INPUT_SOURCE_TYPE);
switch (type) {
case "hugegraph":
return new HugeGraphFetcher(config);
return new HugeGraphFetcher(config, srv);
default:
throw new ComputerException("Unexpected source type %s", type);
}
@@ -23,7 +23,9 @@
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

public class MasterInputHandler {
import com.baidu.hugegraph.computer.core.rpc.InputSplitRpcService;

public class MasterInputHandler implements InputSplitRpcService {

private final InputSplitFetcher fetcher;
private final Queue<InputSplit> vertexSplits;
@@ -51,12 +53,14 @@ public int createEdgeInputSplits() {
return this.edgeSplits.size();
}

public InputSplit pollVertexInputSplit() {
@Override
public InputSplit nextVertexInputSplit() {
InputSplit split = this.vertexSplits.poll();
return split != null ? split : InputSplit.END_SPLIT;
}

public InputSplit pollEdgeInputSplit() {
@Override
public InputSplit nextEdgeInputSplit() {
InputSplit split = this.edgeSplits.poll();
return split != null ? split : InputSplit.END_SPLIT;
}
@@ -0,0 +1,53 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.input;

import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.manager.Manager;
import com.baidu.hugegraph.util.E;

public class MasterInputManager implements Manager {

public static final String NAME = "master_input";

private InputSplitFetcher fetcher;
private MasterInputHandler handler;

@Override
public String name() {
return NAME;
}

@Override
public void init(Config config) {
this.fetcher = InputSourceFactory.createInputSplitFetcher(config);
this.handler = new MasterInputHandler(this.fetcher);
}

@Override
public void close(Config config) {
this.fetcher.close();
}

public MasterInputHandler handler() {
E.checkNotNull(this.handler, "handler");
return this.handler;
}
}
@@ -0,0 +1,78 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.input;

import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.partition.PartitionStat;
import com.baidu.hugegraph.computer.core.manager.Manager;
import com.baidu.hugegraph.computer.core.rpc.InputSplitRpcService;
import com.baidu.hugegraph.computer.core.worker.WorkerStat;
import com.baidu.hugegraph.util.E;

public class WorkerInputManager implements Manager {

public static final String NAME = "worker_input";

/*
* InputGraphFetcher include:
* VertexFetcher vertexFetcher;
* EdgeFetcher edgeFetcher;
*/
private GraphFetcher fetcher;
/*
* Service proxy on the client
*/
private InputSplitRpcService service;

@Override
public String name() {
return NAME;
}

@Override
public void init(Config config) {
assert this.service != null;
this.fetcher = InputSourceFactory.createGraphFetcher(config,
this.service);
}

@Override
public void close(Config config) {
this.fetcher.close();
}

public void service(InputSplitRpcService service) {
E.checkNotNull(service, "service");
this.service = service;
}

public void loadGraph() {
// TODO: calls LoadService to load vertices and edges parallel
}

public WorkerStat mergeGraph() {
// TODO: merge the data in partitions parallel, and get workerStat
PartitionStat stat1 = new PartitionStat(0, 100L, 200L,
50L, 60L, 70L);
WorkerStat workerStat = new WorkerStat();
workerStat.add(stat1);
return workerStat;
}
}
@@ -23,7 +23,9 @@
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.input.EdgeFetcher;
import com.baidu.hugegraph.computer.core.input.GraphFetcher;
import com.baidu.hugegraph.computer.core.input.InputSplit;
import com.baidu.hugegraph.computer.core.input.VertexFetcher;
import com.baidu.hugegraph.computer.core.rpc.InputSplitRpcService;
import com.baidu.hugegraph.driver.HugeClient;
import com.baidu.hugegraph.driver.HugeClientBuilder;

@@ -32,13 +34,15 @@ public class HugeGraphFetcher implements GraphFetcher {
private final HugeClient client;
private final VertexFetcher vertexFetcher;
private final EdgeFetcher edgeFetcher;
private final InputSplitRpcService rpcService;

public HugeGraphFetcher(Config config) {
public HugeGraphFetcher(Config config, InputSplitRpcService rpcService) {
String url = config.get(ComputerOptions.HUGEGRAPH_URL);
String graph = config.get(ComputerOptions.HUGEGRAPH_GRAPH_NAME);
this.client = new HugeClientBuilder(url, graph).build();
this.vertexFetcher = new HugeVertexFetcher(config, this.client);
this.edgeFetcher = new HugeEdgeFetcher(config, this.client);
this.rpcService = rpcService;
}

@Override
@@ -55,4 +59,14 @@ public VertexFetcher vertexFetcher() {
public EdgeFetcher edgeFetcher() {
return this.edgeFetcher;
}

@Override
public InputSplit nextVertexInputSplit() {
return this.rpcService.nextVertexInputSplit();
}

@Override
public InputSplit nextEdgeInputSplit() {
return this.rpcService.nextEdgeInputSplit();
}
}
@@ -17,7 +17,7 @@
* under the License.
*/

package com.baidu.hugegraph.computer.core.worker;
package com.baidu.hugegraph.computer.core.manager;

import com.baidu.hugegraph.computer.core.config.Config;

@@ -29,6 +29,11 @@
*/
public interface Manager {

/**
* The unique identify name.
*/
String name();

/**
* Used to add the resources the computation needed. Be called only one
* time before all supersteps start.

0 comments on commit 10b347e

Please sign in to comment.