Skip to content
Permalink
Browse files
improve wcc test and add property(key) (#94)
* improve wcc test
* add property(key) for vertex and edge
  • Loading branch information
coderzc committed Aug 27, 2021
1 parent 4642742 commit 4f1a5993fd886158842fbb2737689c1ba9b81bce
Showing 7 changed files with 28 additions and 94 deletions.
@@ -22,6 +22,7 @@
import com.baidu.hugegraph.computer.core.allocator.Recyclable;
import com.baidu.hugegraph.computer.core.graph.id.Id;
import com.baidu.hugegraph.computer.core.graph.properties.Properties;
import com.baidu.hugegraph.computer.core.graph.value.Value;

public interface Edge extends Recyclable {

@@ -40,4 +41,6 @@ public interface Edge extends Recyclable {
Properties properties();

void properties(Properties properties);

<T extends Value<T>> T property(String key);
}
@@ -48,6 +48,8 @@ public interface Vertex extends Recyclable {

void properties(Properties properties);

<T extends Value<T>> T property(String key);

boolean active();

void inactivate();
@@ -25,6 +25,7 @@
import com.baidu.hugegraph.computer.core.graph.GraphFactory;
import com.baidu.hugegraph.computer.core.graph.id.Id;
import com.baidu.hugegraph.computer.core.graph.properties.Properties;
import com.baidu.hugegraph.computer.core.graph.value.Value;

public class DefaultEdge implements Edge {

@@ -85,6 +86,11 @@ public void properties(Properties properties) {
this.properties = properties;
}

@Override
public <T extends Value<T>> T property(String key) {
return this.properties.get(key);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
@@ -99,6 +99,11 @@ public void properties(Properties properties) {
this.properties = properties;
}

@Override
public <T extends Value<T>> T property(String key) {
return this.properties.get(key);
}

@Override
public boolean active() {
return this.active;
@@ -43,7 +43,7 @@ public class AlgorithmTestBase extends UnitTestBase {

public static void runAlgorithm(String algorithmParams, String ... options)
throws InterruptedException {
final Logger log = Log.logger(algorithmParams);
final Logger log = Log.logger(AlgorithmTestBase.class);
ExecutorService pool = Executors.newFixedThreadPool(2);
CountDownLatch countDownLatch = new CountDownLatch(2);
Throwable[] exceptions = new Throwable[2];
@@ -87,9 +87,9 @@ public void write(Vertex vertex) {
} else {
Iterator<Edge> edges = vertex.edges().iterator();
double totalValue = Streams.stream(edges).map(
(edge) -> ((DoubleValue) edge.properties()
.get(this.weight))
.value())
(edge) -> ((DoubleValue)
edge.property(this.weight))
.value())
.reduce((v1, v2) -> v1 + v2).get();
Assert.assertEquals(totalValue, value.value(), 0.000001);
}
@@ -19,102 +19,20 @@

package com.baidu.hugegraph.computer.algorithm.wcc;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.Test;
import org.slf4j.Logger;

import com.baidu.hugegraph.computer.algorithm.rank.pagerank.PageRankParams;
import com.baidu.hugegraph.computer.algorithm.AlgorithmTestBase;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.master.MasterService;
import com.baidu.hugegraph.computer.core.util.ComputerContextUtil;
import com.baidu.hugegraph.computer.core.worker.MockWorkerService;
import com.baidu.hugegraph.computer.core.worker.WorkerService;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
import com.baidu.hugegraph.config.RpcOptions;
import com.baidu.hugegraph.testutil.Assert;
import com.baidu.hugegraph.util.Log;

public class WccTest extends UnitTestBase {

private static final Logger LOG = Log.logger(WccTest.class);
public class WccTest extends AlgorithmTestBase {

@Test
public void testServiceWith1Worker() throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
CountDownLatch countDownLatch = new CountDownLatch(2);
Throwable[] exceptions = new Throwable[2];

pool.submit(() -> {
Map<String, String> params = new HashMap<>();
params.put(RpcOptions.RPC_REMOTE_URL.name(), "127.0.0.1:8090");
params.put(ComputerOptions.JOB_ID.name(), "local_002");
params.put(ComputerOptions.JOB_WORKERS_COUNT.name(), "1");
params.put(ComputerOptions.TRANSPORT_SERVER_PORT.name(), "8086");
params.put(ComputerOptions.BSP_REGISTER_TIMEOUT.name(), "100000");
params.put(ComputerOptions.BSP_LOG_INTERVAL.name(), "30000");
params.put(ComputerOptions.BSP_MAX_SUPER_STEP.name(), "10");
params.put(ComputerOptions.ALGORITHM_PARAMS_CLASS.name(),
WccParams.class.getName());

Config config = ComputerContextUtil.initContext(params);

WorkerService workerService = new MockWorkerService();
try {
Thread.sleep(2000L);
workerService.init(config);
workerService.execute();
} catch (Throwable e) {
LOG.error("Failed to start worker", e);
exceptions[0] = e;
} finally {
workerService.close();
countDownLatch.countDown();
}
});

pool.submit(() -> {
Map<String, String> params = new HashMap<>();
params.put(RpcOptions.RPC_SERVER_HOST.name(), "localhost");
params.put(RpcOptions.RPC_SERVER_PORT.name(), "8090");
params.put(ComputerOptions.JOB_ID.name(), "local_002");
params.put(ComputerOptions.JOB_WORKERS_COUNT.name(), "1");
params.put(ComputerOptions.BSP_REGISTER_TIMEOUT.name(), "100000");
params.put(ComputerOptions.BSP_LOG_INTERVAL.name(), "30000");
params.put(ComputerOptions.BSP_MAX_SUPER_STEP.name(), "10");
params.put(ComputerOptions.ALGORITHM_PARAMS_CLASS.name(),
PageRankParams.class.getName());

Config config = ComputerContextUtil.initContext(params);

MasterService masterService = new MasterService();
try {
masterService.init(config);
masterService.execute();
} catch (Throwable e) {
LOG.error("Failed to start master", e);
exceptions[1] = e;
} finally {
/*
* It must close the service first. The pool will be shutdown
* if count down is executed first, and the server thread in
* master service will not be closed.
*/
masterService.close();
countDownLatch.countDown();
}
});

countDownLatch.await();
pool.shutdownNow();

Assert.assertFalse(Arrays.asList(exceptions).toString(),
existError(exceptions));
runAlgorithm(WccParams.class.getName(),
ComputerOptions.JOB_ID.name(), "local_002",
ComputerOptions.JOB_WORKERS_COUNT.name(), "1",
ComputerOptions.BSP_REGISTER_TIMEOUT.name(), "100000",
ComputerOptions.BSP_LOG_INTERVAL.name(), "30000",
ComputerOptions.BSP_MAX_SUPER_STEP.name(), "10");
}
}

0 comments on commit 4f1a599

Please sign in to comment.