Skip to content
Permalink
Browse files
implement kcore algorithm (#143)
  • Loading branch information
javeme committed Nov 16, 2021
1 parent 66fc6f0 commit 52a2faf91f08eead64ecdb2ecff784702325c314
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 0 deletions.
@@ -0,0 +1,123 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.algorithm.community.kcore;

import java.util.Iterator;

import com.baidu.hugegraph.computer.core.combiner.Combiner;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.id.Id;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;
import com.baidu.hugegraph.computer.core.worker.Computation;
import com.baidu.hugegraph.computer.core.worker.ComputationContext;
import com.google.common.collect.Iterators;

public class Kcore implements Computation<Id> {

public static final String OPTION_K = "kcore.k";
public static final int K_DEFAULT_VALUE = 3;

private final KcoreValue initValue = new KcoreValue();

private int k = 0;

@Override
public String name() {
return "kcore";
}

@Override
public String category() {
return "community";
}

@Override
public void init(Config config) {
this.k = config.getInt(OPTION_K, K_DEFAULT_VALUE);
}

@Override
public void compute0(ComputationContext context, Vertex vertex) {
KcoreValue value = this.initValue;
value.core((Id) vertex.id().copy());
vertex.value(value);

if (vertex.numEdges() < this.k) {
value.degree(0);
/*
* TODO: send int type message at phase 1, it's different from id
* type of phase 2 (wcc message), need support switch message type.
*/
context.sendMessageToAllEdges(vertex, vertex.id());
vertex.inactivate();
} else {
value.degree(vertex.numEdges());
assert vertex.active();
}
}

@Override
public void compute(ComputationContext context, Vertex vertex,
Iterator<Id> messages) {
KcoreValue value = vertex.value();
if (!value.active()) {
// Ignore messages of deleted vertex
vertex.inactivate();
return;
}

int superstep = context.superstep();
if (superstep <= 2) {
int deleted = Iterators.size(messages);
assert value.active();
if (value.decreaseDegree(deleted) < this.k) {
// From active to inactive, delete self vertex
value.degree(0);
if (superstep == 1) {
context.sendMessageToAllEdges(vertex, vertex.id());
}
vertex.inactivate();
} else {
// From active to active, do wcc from superstep 2
if (superstep == 2) {
// Start wcc
context.sendMessageToAllEdgesIf(vertex, vertex.id(),
(source, target) -> {
return source.compareTo(target) < 0;
});
vertex.inactivate();
} else {
// Keep active at superstep 1 to continue superstep 2
assert superstep == 1;
assert vertex.active();
}
}
} else {
// Do wcc
assert superstep > 2;
Id message = Combiner.combineAll(context.combiner(), messages);
if (value.core().compareTo(message) > 0) {
value.core(message);
context.sendMessageToAllEdges(vertex, message);
}
vertex.inactivate();
}
}
}
@@ -0,0 +1,45 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.algorithm.community.kcore;

import java.util.Map;

import com.baidu.hugegraph.computer.algorithm.AlgorithmParams;
import com.baidu.hugegraph.computer.core.combiner.ValueMinCombiner;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.graph.id.BytesId;
import com.baidu.hugegraph.computer.core.output.LimitedLogOutput;

public class KcoreParams implements AlgorithmParams {

@Override
public void setAlgorithmParameters(Map<String, String> params) {
this.setIfAbsent(params, ComputerOptions.WORKER_COMPUTATION_CLASS,
Kcore.class.getName());
this.setIfAbsent(params, ComputerOptions.ALGORITHM_RESULT_CLASS,
KcoreValue.class.getName());
this.setIfAbsent(params, ComputerOptions.ALGORITHM_MESSAGE_CLASS,
BytesId.class.getName());
this.setIfAbsent(params, ComputerOptions.WORKER_COMBINER_CLASS,
ValueMinCombiner.class.getName());
this.setIfAbsent(params, ComputerOptions.OUTPUT_CLASS,
LimitedLogOutput.class.getName());
}
}
@@ -0,0 +1,124 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.algorithm.community.kcore;

import java.io.IOException;

import org.apache.commons.lang3.builder.ToStringBuilder;

import com.baidu.hugegraph.computer.core.graph.id.BytesId;
import com.baidu.hugegraph.computer.core.graph.id.Id;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.graph.value.ValueType;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.io.RandomAccessOutput;
import com.baidu.hugegraph.util.E;

public class KcoreValue implements Value<KcoreValue> {

private int degree;
private Id core;

public KcoreValue() {
this.degree = 0;
this.core = new BytesId();
}

public void degree(int degree) {
assert degree >= 0;
this.degree = degree;
}

public int degree() {
return this.degree;
}

public int decreaseDegree(int decrease) {
assert decrease <= this.degree;
this.degree -= decrease;
return this.degree;
}

public boolean active() {
return this.degree > 0;
}

public void core(Id core) {
this.core = core;
}

public Id core() {
E.checkNotNull(this.core, "core");
return this.core;
}

@Override
public ValueType valueType() {
return ValueType.UNKNOWN;
}

@Override
public void assign(Value<KcoreValue> other) {
throw new UnsupportedOperationException();
}

@Override
public Value<KcoreValue> copy() {
KcoreValue kcoreValue = new KcoreValue();
kcoreValue.core = (Id) this.core.copy();
kcoreValue.degree = this.degree;
return kcoreValue;
}

@Override
public void read(RandomAccessInput in) throws IOException {
this.core.read(in);
this.degree = in.readInt();
}

@Override
public void write(RandomAccessOutput out) throws IOException {
this.core.write(out);
out.writeInt(this.degree);
}

@Override
public int compareTo(KcoreValue other) {
throw new UnsupportedOperationException();
}

@Override
public String toString() {
return new ToStringBuilder(this)
.append("core", this.core)
.append("degree", this.degree)
.toString();
}

@Override
public String string() {
return String.valueOf(this.value());
}

@Override
public Object value() {
return this.core;
}
}
@@ -25,6 +25,7 @@

import com.baidu.hugegraph.computer.algorithm.centrality.degree.DegreeCentralityTest;
import com.baidu.hugegraph.computer.algorithm.centrality.pagerank.PageRankTest;
import com.baidu.hugegraph.computer.algorithm.community.kcore.KcoreTest;
import com.baidu.hugegraph.computer.algorithm.community.lpa.LpaTest;
import com.baidu.hugegraph.computer.algorithm.community.trianglecount.TriangleCountTest;
import com.baidu.hugegraph.computer.algorithm.community.wcc.WccTest;
@@ -38,6 +39,7 @@
DegreeCentralityTest.class,
WccTest.class,
LpaTest.class,
KcoreTest.class,
TriangleCountTest.class,
RingsDetectionTest.class,
RingsDetectionWithFilterTest.class
@@ -0,0 +1,32 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.algorithm.community.kcore;

import org.junit.Test;

import com.baidu.hugegraph.computer.algorithm.AlgorithmTestBase;

public class KcoreTest extends AlgorithmTestBase {

@Test
public void testRunAlgorithm() throws InterruptedException {
runAlgorithm(KcoreParams.class.getName());
}
}

0 comments on commit 52a2faf

Please sign in to comment.