Skip to content
Permalink
Browse files
add WorkerService and MasterService (#27)
* rename input to inputStep
* MasterService does not implement MasterContext
* add name in Combiner and Computation
* add category in Computation
  • Loading branch information
houzhizhen committed Apr 9, 2021
1 parent acd3434 commit 3bfed77bcd41292d08aeaeff2295bfc53b010d27
Showing 23 changed files with 1,151 additions and 7 deletions.
@@ -0,0 +1,29 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.aggregator;

import com.baidu.hugegraph.computer.core.worker.Manager;

/**
* Aggregator manager manages aggregators in master.
* TODO: implement
*/
public class MasterAggrManager implements Manager {
}
@@ -0,0 +1,29 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.aggregator;

import com.baidu.hugegraph.computer.core.worker.Manager;

/**
* Aggregator manager manages aggregators in worker.
* TODO: implement
*/
public class WorkerAggrManager implements Manager {
}
@@ -23,6 +23,14 @@

public interface Combiner<T> {

/**
* @return The name of the combiner.
* @return class name by default.
*/
default String name() {
return this.getClass().getName();
}

public static <T> T combineAll(Combiner<T> combiner, Iterator<T> values) {
if (!values.hasNext()) {
return null;
@@ -51,4 +51,5 @@ public final class Constants {
// The mode to write a file
public static final String FILE_MODE_WRITE = "rw";

public static final int INPUT_SUPERSTEP = -1;
}
@@ -43,6 +43,10 @@ public class ContainerInfo implements Readable, Writable {
public ContainerInfo() {
}

public ContainerInfo(int id, String hostname, int rpcPort) {
this(id, hostname, rpcPort, 0);
}

public ContainerInfo(int id, String hostname, int rpcPort, int dataPort) {
E.checkArgumentNotNull(hostname, "The hostname can't be null");
this.id = id;
@@ -26,8 +26,11 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.baidu.hugegraph.computer.core.aggregator.MasterAggrManager;
import com.baidu.hugegraph.computer.core.aggregator.WorkerAggrManager;
import com.baidu.hugegraph.computer.core.graph.partition.HashPartitioner;
import com.baidu.hugegraph.computer.core.master.DefaultMasterComputation;
import com.baidu.hugegraph.computer.core.worker.Computation;
import com.baidu.hugegraph.config.ConfigOption;
import com.baidu.hugegraph.config.OptionHolder;
import com.google.common.collect.ImmutableSet;
@@ -260,6 +263,35 @@ public static synchronized ComputerOptions instance() {
HashPartitioner.class
);

public static final ConfigOption<Class<?>> WORKER_COMPUTATION_CLASS =
new ConfigOption<>(
"worker.computation_class",
"The class to create worker-computation object, " +
"worker-computation is used to compute each vertex " +
"in each superstep.",
disallowEmpty(),
Computation.class
);

public static final ConfigOption<Class<?>> WORKER_COMBINER_CLASS =
new ConfigOption<>(
"worker.combiner_class",
"Combiner can combine messages into one value for a " +
"vertex, for example page-rank algorithm can combine " +
"messages of a vertex to a sum value.",
disallowEmpty(),
Null.class
);

public static final ConfigOption<Class<?>> WORKER_AGGREGATOR_MANAGER_CLASS =
new ConfigOption<>(
"worker.aggregator_manager_class",
"Class to create aggregator manager that manages " +
"aggregators in worker.",
disallowEmpty(),
WorkerAggrManager.class
);

public static final ConfigOption<Class<?>> MASTER_COMPUTATION_CLASS =
new ConfigOption<>(
"master.computation_class",
@@ -270,6 +302,15 @@ public static synchronized ComputerOptions instance() {
DefaultMasterComputation.class
);

public static final ConfigOption<Class<?>> MASTER_AGGREGATOR_MANAGER_CLASS =
new ConfigOption<>(
"master.aggregator_manager_class",
"Class to create aggregator manager that manages " +
"aggregators in master.",
disallowEmpty(),
MasterAggrManager.class
);

public static final ConfigOption<String> HUGEGRAPH_URL =
new ConfigOption<>(
"hugegraph.url",
@@ -157,6 +157,9 @@ public String getString(String key, String defaultValue) {
*/
public <T> T createObject(ConfigOption<Class<?>> clazzOption) {
Class clazz = this.get(clazzOption);
if (clazz == Null.class) {
return null;
}
try {
@SuppressWarnings("unchecked")
T instance = (T) clazz.newInstance();
@@ -0,0 +1,32 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.config;

import com.baidu.hugegraph.config.ConfigOption;

/**
* Null is used in ConfigOption<Class> to indicate a null class option.
* The config does not allow the default value null, so this is used as the
* default class value when class option can be null.
* {@link Config#createObject(ConfigOption)} will return null, if the value
* is ${@code Null.class}".
*/
public class Null {
}
@@ -20,6 +20,7 @@
package com.baidu.hugegraph.computer.core.graph;

import java.io.IOException;
import java.util.List;

import com.baidu.hugegraph.computer.core.graph.partition.PartitionStat;
import com.baidu.hugegraph.computer.core.util.JsonUtil;
@@ -141,4 +142,12 @@ public int hashCode() {
public String toString() {
return JsonUtil.toJsonWithClass(this);
}

public static SuperstepStat from(List<WorkerStat> workerStats) {
SuperstepStat superstepStat = new SuperstepStat();
for (WorkerStat workerStat : workerStats) {
superstepStat.increase(workerStat);
}
return superstepStat;
}
}
@@ -19,14 +19,16 @@

package com.baidu.hugegraph.computer.core.master;

import com.baidu.hugegraph.computer.core.config.Config;

/**
* The default master-computation, which can be used if there is no
* algorithm-specific master-computation.
*/
public class DefaultMasterComputation implements MasterComputation {

@Override
public void init(MasterContext context) {
public void init(Config config) {
// pass
}

@@ -19,6 +19,8 @@

package com.baidu.hugegraph.computer.core.master;

import com.baidu.hugegraph.computer.core.config.Config;

/**
* Master-computation is computation that can determine whether to continue
* next superstep. It runs on master. It can perform centralized computation
@@ -37,10 +39,10 @@ public interface MasterComputation {
* this method. Create resources used in compute. Be called before all
* supersteps start.
*/
void init(MasterContext context);
void init(Config config);

/**
* Close the resource created in {@link #init(MasterContext)}. Be called
* Close the resource created in {@link #init(Config)}. Be called
* after all supersteps.
*/
void close();

0 comments on commit 3bfed77

Please sign in to comment.