Skip to content
Permalink
Browse files
fix FileGraphPartition and some warnings (#90)
* improve FileGraphPartition
* fix warnings
* fix warnings of test
* fix MessageInputTest
* fix tow workers shared one data_dirs
  • Loading branch information
javeme committed Aug 17, 2021
1 parent 3a3483e commit b6341ca82f35f16160b5df7c5fa0fe110d788086
Showing 37 changed files with 251 additions and 196 deletions.
@@ -25,7 +25,6 @@
public class ValueMinCombiner<T extends Value<T>> implements Combiner<T> {

@Override
@SuppressWarnings("unchecked")
public T combine(T v1, T v2) {
E.checkArgumentNotNull(v1, "The combine parameter v1 can't be null");
E.checkArgumentNotNull(v2, "The combine parameter v2 can't be null");
@@ -27,6 +27,8 @@

public class DoubleValue extends Number implements Value<DoubleValue> {

private static final long serialVersionUID = -524902178200973565L;

private double value;

public DoubleValue() {
@@ -27,6 +27,8 @@

public class FloatValue extends Number implements Value<FloatValue> {

private static final long serialVersionUID = 6098857579782490901L;

private float value;

public FloatValue() {
@@ -27,6 +27,8 @@

public class IntValue extends Number implements Value<IntValue> {

private static final long serialVersionUID = -2014388310992178979L;

private int value;

public IntValue() {
@@ -45,7 +47,7 @@ public long longValue() {

@Override
public float floatValue() {
return (float) this.value;
return this.value;
}

@Override
@@ -27,6 +27,8 @@

public class LongValue extends Number implements Value<LongValue> {

private static final long serialVersionUID = 8332327679205404212L;

private long value;

public LongValue() {
@@ -45,12 +47,12 @@ public long longValue() {

@Override
public float floatValue() {
return (float) this.value;
return this.value;
}

@Override
public double doubleValue() {
return (double) this.value;
return this.value;
}

public LongValue(long value) {
@@ -56,7 +56,7 @@ protected RecyclerReference<T> newObject(
Recycler.Handle<RecyclerReference<T>> handle) {
T recyclable = supplier.get();
return new RecyclerReference<>(recyclable,
new RecycleHandler(handle));
new RecycleHandler<>(handle));
}
};
}
@@ -203,11 +203,6 @@ public void waitWorkersCloseDone() {
LOG.info("Master waited workers close-done");
}

public void clean() {
this.bspClient().clean();
LOG.info("Cleaned up the BSP data");
}

private List<byte[]> waitOnWorkersEvent(String prefix, long timeout) {
return this.bspClient().getChildren(prefix, this.workerCount(),
timeout, this.logInterval());
@@ -97,11 +97,6 @@ public List<ContainerInfo> waitMasterAllInitDone() {
return containers;
}

public void clean() {
this.bspClient().clean();
LOG.info("Cleaned up the BSP data");
}

/**
* The master set this signal to let workers knows the first superstep to
* start with.
@@ -21,6 +21,7 @@

import org.slf4j.Logger;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.util.Log;
@@ -77,6 +78,19 @@ public void close() {
this.bspClient.type(), this.bspClient.endpoint(), this.jobId);
}

/**
* Cleaned up the BSP data
*/
public void clean() {
try {
this.bspClient().clean();
} catch (Exception e) {
throw new ComputerException("Failed to clean up the BSP data: %s",
e, this.bspClient().endpoint());
}
LOG.info("Cleaned up the BSP data: %s", this.bspClient().endpoint());
}

private BspClient createBspClient() {
// TODO: create from factory. the type of bsp can be get from config
return new EtcdBspClient(this.config);
@@ -64,20 +64,22 @@ public ComputeManager(ComputerContext context, Managers managers,
public WorkerStat input() {
WorkerStat workerStat = new WorkerStat();
this.recvManager.waitReceivedAllMessages();
Map<Integer, PeekableIterator<KvEntry>> vertices;
vertices = this.recvManager.vertexPartitions();
Map<Integer, PeekableIterator<KvEntry>> edges;
edges = this.recvManager.edgePartitions();

Map<Integer, PeekableIterator<KvEntry>> vertices =
this.recvManager.vertexPartitions();
Map<Integer, PeekableIterator<KvEntry>> edges =
this.recvManager.edgePartitions();
// TODO: parallel input process
for (Map.Entry<Integer, PeekableIterator<KvEntry>> entry :
vertices.entrySet()) {
FileGraphPartition<M> partition = new FileGraphPartition<>(
this.context, this.managers,
entry.getKey());
PartitionStat partitionStat = partition.init(entry.getValue(),
edges.get(entry.getKey()));
int partition = entry.getKey();
FileGraphPartition<M> part = new FileGraphPartition<>(this.context,
this.managers,
partition);
PartitionStat partitionStat = part.input(entry.getValue(),
edges.get(partition));
workerStat.add(partitionStat);
this.partitions.put(entry.getKey(), partition);
this.partitions.put(partition, part);
}
return workerStat;
}
@@ -87,7 +89,7 @@ public WorkerStat input() {
* corresponding partition. Be called before
* {@link MessageRecvManager#beforeSuperstep} is called.
*/
public void takeComputeMessages() {
public void takeRecvedMessages() {
Map<Integer, PeekableIterator<KvEntry>> messages =
this.recvManager.messagePartitions();
for (FileGraphPartition<M> partition : this.partitions.values()) {

0 comments on commit b6341ca

Please sign in to comment.