Skip to content
Permalink
Browse files
Add computation interface (#17)
* Add computation interface

* add FilterMapComputation and ReduceComputation

* add MasterComputation
  • Loading branch information
houzhizhen committed Mar 26, 2021
1 parent c8e2100 commit 32390d3898b6c656044d19060dfe71b8c3aeba77
Showing 12 changed files with 849 additions and 0 deletions.
@@ -0,0 +1,80 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.aggregator;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.graph.value.Value;

public interface Aggregator<V extends Value> {

/**
* Used by worker to aggregate a new value when compute a vertex, needs to
* be commutative and associative.
* @param value The value to be aggregated
*/
void aggregateValue(V value);

/**
* Used by worker to aggregate an int value when compute a vertex. For
* performance reasons, it can aggregate without create an IntValue object.
*/
default void aggregateValue(int value) {
throw new ComputerException("Not implemented: aggregateValue(int)");
}

/**
* Used by worker to aggregate a long value. For performance reasons, it can
* aggregate without create a LongValue object.
*/
default void aggregateValue(long value) {
throw new ComputerException("Not implemented: aggregateValue(long)");
}

/**
* Used by worker to aggregate a float value. For performance reasons, it
* can aggregate without create a FloatValue object.
*/
default void aggregateValue(float value) {
throw new ComputerException("Not implemented: aggregateValue(float)");
}

/**
* Used by worker to aggregate a double value. For performance reasons,
* it can aggregate without create a DoubleValue object.
*/
default void aggregateValue(double value) {
throw new ComputerException("Not implemented: aggregateValue(double)");
}

/**
* Used by worker or master to get current aggregated value. The worker
* get aggregated value before a superstep. The master can get the
* aggregated value after a superstep.
*/
V aggregatedValue();

/**
* Used by worker or master to set current aggregated value directly. The
* worker set aggregated value and then sent to master for further
* aggregation. The master set aggregated value and then use by worker in
* next superstep.
*/
void aggregatedValue(V value);
}
@@ -0,0 +1,69 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.aggregator;

import com.baidu.hugegraph.computer.core.combiner.Combiner;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.graph.value.ValueType;

/**
* Aggregator4Master used by algorithm's master-computation. The master must
* register all aggregators before all supersteps start. Then the workers can
* aggregate aggregators in superstep, send the aggregators to master when
* superstep finish. The master aggregates the aggregators sent by workers.
* The workers can get the aggregated values master aggregated at next
* superstep. These values are identical among all workers.
*/
public interface Aggregator4Master {

/**
* Register the aggregator with specified name. The name must be unique.
* Used by algorithm's master-computation to register aggregators.
*/
<V extends Value> void registerAggregator(
String name,
Class<? extends Aggregator<V>> aggregatorClass);

/**
* Register aggregator with specified value type and a combiner which can
* combine values with specified value type. The name must be unique.
* Used by algorithm's master-computation to register aggregators.
*/
<V extends Value> void registerAggregator(
String name,
ValueType type,
Class<? extends Combiner<V>> combinerClass);

/**
* Set the aggregated value by master-computation. The value will be
* received by workers at next superstep.
* Throws ComputerException if master does not register the aggregator
* with specified name.
*/
<V extends Value> void aggregatedValue(String name, V value);

/**
* Get the aggregated value. The aggregated value is aggregated from
* workers at this superstep.
* Throws ComputerException if master does not register the aggregator
* with specified name.
*/
<V extends Value> V aggregatedValue(String name);
}
@@ -0,0 +1,48 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.aggregator;

import com.baidu.hugegraph.computer.core.graph.value.Value;

/**
* Aggregator4Worker used by algorithm's computation. The computation can
* only use the aggregator with specified name the master registered.
* See {@link Aggregator4Master} for detailed information about aggregation
* process.
*/
public interface Aggregator4Worker {

/**
* Set aggregate value after superstep. The value will be sent to
* master when current superstep finish.
* Throws ComputerException if master does not register the aggregator
* with specified name.
* @param value The value to be aggregated
*/
<V extends Value> void aggregateValue(String name, V value);

/**
* Get the aggregated value before a superstep start. The value is
* aggregated by master at previous superstep.
* Throws ComputerException if master does not register the aggregator
* with specified name.
*/
<V extends Value> V aggregatedValue(String name);
}
@@ -19,8 +19,21 @@

package com.baidu.hugegraph.computer.core.combiner;

import java.util.Iterator;

public interface Combiner<T> {

public static <T> T combineAll(Combiner<T> combiner, Iterator<T> values) {
if (!values.hasNext()) {
return null;
}
T result = values.next();
while (values.hasNext()) {
result = combiner.combine(result, values.next());
}
return result;
}

/**
* Combine v1 and v2, return the combined value. The combined value may
* take use v1 or v2. The value of v1 and v2 may be updated. Should not
@@ -0,0 +1,45 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.master;

/**
* The default master-computation, which can be used if there is no
* algorithm-specific master-computation.
*/
public class DefaultMasterComputation implements MasterComputation {

@Override
public void init(MasterContext context) {
// pass
}

/**
* Compute until max superstep.
*/
@Override
public boolean compute(MasterContext context) {
return true;
}

@Override
public void close() {
// pass
}
}
@@ -0,0 +1,55 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.master;

/**
* Master-computation is computation that can determine whether to continue
* next superstep. It runs on master. It can perform centralized computation
* between supersteps. It runs after every superstep.
*
* The communication between the master and workers is performed via
* aggregators. The value of aggregators are collected by the master before
* {@link #compute(MasterContext)} ()} is called. This means aggregator
* values used by the workers are consistent with aggregator values from the
* master in the same superstep.
*/
public interface MasterComputation {

/**
* Initialize the master-computation. Register algorithms's aggregators in
* this method. Create resources used in compute. Be called before all
* supersteps start.
*/
void init(MasterContext context);

/**
* Close the resource created in {@link #init(MasterContext)}. Be called
* after all supersteps.
*/
void close();

/**
* The master-algorithm can use aggregators to determine whether to
* continue the next execution or not. Be called at the end of
* a superstep.
* @return true if want to continue the next iteration.
*/
boolean compute(MasterContext context);
}
@@ -0,0 +1,64 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.master;

import com.baidu.hugegraph.computer.core.aggregator.Aggregator4Master;

/**
* The MasterContext is the interface for the algorithm's master-computation.
* Algorithm's master-computation can get aggregators and get graph
* information such as total vertex count and total edge count.
*/
public interface MasterContext extends Aggregator4Master {

/**
* @return the total vertex count of the graph. The value may vary from
* superstep to superstep, because the algorithm may add or delete vertices
* during superstep.
*/
long totalVertexCount();

/**
* @return the total edge count of the graph. The value may vary from
* superstep to superstep, because the algorithm may add or delete edges
* during superstep.
*/
long totalEdgeCount();

/**
* @return the vertex count that is inactive.
*/
long finishedVertexCount();

/**
* @return the message count at current superstep.
*/
long messageCount();

/**
* @return the message received in bytes at current superstep.
*/
long messageBytes();

/**
* @return the current superstep.
*/
int superstep();
}

0 comments on commit 32390d3

Please sign in to comment.