diff --git a/README.md b/README.md index a4321b6..bf9560d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,120 @@ -spark-distributed-louvain-modularity -==================================== +# dga-graphx + +- GraphX Algorithms + +The dga-graphX package contains several pre-built executable graph algorithms built on Spark using the GraphX framework. + +### pre-requisites + + * [Spark] (http://spark.apache.org/) 0.9.0 or later + * [graphX] (http://spark.apache.org/docs/latest/graphx-programming-guide.html) + * [Gradle] (http://www.gradle.org/) + +### build + +If necessary edit the build.gradle file to set your version of spark and graphX + +> gradle clean dist + +Check the build/dist folder for dga-graphx-0.1.jar. + + +# Algorithms + +## Louvain + +### about louvain + +Louvain distributed community detection is a parallelized version of this work: +``` +Fast unfolding of communities in large networks, +Vincent D Blondel, Jean-Loup Guillaume, Renaud Lambiotte, Etienne Lefebvre, +Journal of Statistical Mechanics: Theory and Experiment 2008 (10), P10008 (12pp) +``` +In the original algorithm each vertex examines the communities of its neighbors and makes a chooses a new community based on a function to maximize the calculated change in modularity. In the distributed version all vertices make this choice simultaneously rather than in serial order, updating the graph state after each change. Because choices are made in parallel some choice will be incorrect and will not maximize modularity values, however after repeated iterations community choices become more stable and we get results that closely mirror the serial algorithm. + +### running louvain + +After building the package (See above) you can execute the lovain algorithm against an edge list using the provided script + +``` +bin/louvain + +Usage: class com.soteradefense.dga.graphx.louvain.Main$ [options] [=....] + + -i | --input + input file or path Required. + -o | --output + output path Required + -m | --master + spark master, local[N] or spark://host:port default=local + -h | --sparkhome + SPARK_HOME Required to run on cluster + -n | --jobname + job name + -p | --parallelism + sets spark.default.parallelism and minSplits on the edge file. default=based on input partitions + -x | --minprogress + Number of vertices that must change communites for the algorithm to consider progress. default=2000 + -y | --progresscounter + Number of times the algorithm can fail to make progress before exiting. default=1 + -d | --edgedelimiter + specify input file edge delimiter. default="," + -j | --jars + comma seperated list of jars + -z | --ipaddress + Set to true to convert ipaddresses to Long ids. Defaults to false + =.... +``` + +To run a small local example execute: +``` +bin/louvain -i examples/small_edges.tsv -o test_output --edgedelimiter "\t" 2> stderr.txt +``` + +Spark produces alot of output, so sending stderr to a log file is recommended. Examine the test_output folder. you should see + +``` +test_output/ +├── level_0_edges +│   ├── _SUCCESS +│   └── part-00000 +├── level_0_vertices +│   ├── _SUCCESS +│   └── part-00000 +└── qvalues + ├── _SUCCESS + └── part-00000 +``` + +``` +cat test_output/level_0_vertices/part-00000 +(7,{community:8,communitySigmaTot:13,internalWeight:0,nodeWeight:3}) +(4,{community:4,communitySigmaTot:21,internalWeight:0,nodeWeight:4}) +(2,{community:4,communitySigmaTot:21,internalWeight:0,nodeWeight:4}) +(6,{community:8,communitySigmaTot:13,internalWeight:0,nodeWeight:4}) +(8,{community:8,communitySigmaTot:13,internalWeight:0,nodeWeight:3}) +(5,{community:4,communitySigmaTot:21,internalWeight:0,nodeWeight:4}) +(9,{community:8,communitySigmaTot:13,internalWeight:0,nodeWeight:3}) +(3,{community:4,communitySigmaTot:21,internalWeight:0,nodeWeight:4}) +(1,{community:4,communitySigmaTot:21,internalWeight:0,nodeWeight:5}) + +cat test_output/qvalues/part-00000 +(0,0.4134948096885813) +``` + +Note: the output is laid out as if you were in hdfs even when running local. For each level you see an edges directory and a vertices directory. The "level" refers to the number of times the graph has been "community compressed". At level 1 all of the level 0 vertices in community X are represented by a single vertex with the VertexID: X. For the small example all modulairyt was maximized with no community compression so only level 0 was computed. The vertices show the state of each vertex while the edges file specify the graph structure. The qvalues directory lists the modularity of the graph at each level of compression. For this example you should be able to see all of vertices splitting off into two distinct communities (community 4 and 8 ) with a final qvalue of ~ 0.413 + + +### running louvain on a cluster + +To run on a cluster be sure your input and output paths are of the form "hdfs:///path" and ensure you provide the --master and --sparkhome options. The --jars option is already set by the louvain script itself and need not be applied. + +### parallelism + +To change the level of parallelism use the -p or --parallelism option. If this option is not set parallelism will be based on the layout of the input data in HDFS. The number of partitions of the input file sets the level of parallelism. + +### advanced + +If you would like to include the louvain algorithm in your own compute pipeline or create a custom output format, etc you can easily do so by extending the com.soteradefense.dga.graphx.louvain.LouvainHarness class. See HDFSLouvainRunner which extends LouvainHarness and is called by Main for the example above -Spark / graphX implementation of the distributed louvain modularity algorithm diff --git a/dga-graphx/.DS_Store b/dga-graphx/.DS_Store new file mode 100644 index 0000000..eb68f43 Binary files /dev/null and b/dga-graphx/.DS_Store differ diff --git a/dga-graphx/.gradle/1.10/taskArtifacts/cache.properties b/dga-graphx/.gradle/1.10/taskArtifacts/cache.properties new file mode 100644 index 0000000..b83bb42 --- /dev/null +++ b/dga-graphx/.gradle/1.10/taskArtifacts/cache.properties @@ -0,0 +1 @@ +#Fri Mar 21 13:30:02 PDT 2014 diff --git a/dga-graphx/.gradle/1.10/taskArtifacts/cache.properties.lock b/dga-graphx/.gradle/1.10/taskArtifacts/cache.properties.lock new file mode 100644 index 0000000..b1cb0e4 Binary files /dev/null and b/dga-graphx/.gradle/1.10/taskArtifacts/cache.properties.lock differ diff --git a/dga-graphx/.gradle/1.10/taskArtifacts/fileHashes.bin b/dga-graphx/.gradle/1.10/taskArtifacts/fileHashes.bin new file mode 100644 index 0000000..42ce435 Binary files /dev/null and b/dga-graphx/.gradle/1.10/taskArtifacts/fileHashes.bin differ diff --git a/dga-graphx/.gradle/1.10/taskArtifacts/fileSnapshots.bin b/dga-graphx/.gradle/1.10/taskArtifacts/fileSnapshots.bin new file mode 100644 index 0000000..57b2fd6 Binary files /dev/null and b/dga-graphx/.gradle/1.10/taskArtifacts/fileSnapshots.bin differ diff --git a/dga-graphx/.gradle/1.10/taskArtifacts/outputFileStates.bin b/dga-graphx/.gradle/1.10/taskArtifacts/outputFileStates.bin new file mode 100644 index 0000000..667ee34 Binary files /dev/null and b/dga-graphx/.gradle/1.10/taskArtifacts/outputFileStates.bin differ diff --git a/dga-graphx/.gradle/1.10/taskArtifacts/taskArtifacts.bin b/dga-graphx/.gradle/1.10/taskArtifacts/taskArtifacts.bin new file mode 100644 index 0000000..13e366d Binary files /dev/null and b/dga-graphx/.gradle/1.10/taskArtifacts/taskArtifacts.bin differ diff --git a/dga-graphx/README.md b/dga-graphx/README.md new file mode 100644 index 0000000..bf9560d --- /dev/null +++ b/dga-graphx/README.md @@ -0,0 +1,120 @@ +# dga-graphx + +- GraphX Algorithms + +The dga-graphX package contains several pre-built executable graph algorithms built on Spark using the GraphX framework. + +### pre-requisites + + * [Spark] (http://spark.apache.org/) 0.9.0 or later + * [graphX] (http://spark.apache.org/docs/latest/graphx-programming-guide.html) + * [Gradle] (http://www.gradle.org/) + +### build + +If necessary edit the build.gradle file to set your version of spark and graphX + +> gradle clean dist + +Check the build/dist folder for dga-graphx-0.1.jar. + + +# Algorithms + +## Louvain + +### about louvain + +Louvain distributed community detection is a parallelized version of this work: +``` +Fast unfolding of communities in large networks, +Vincent D Blondel, Jean-Loup Guillaume, Renaud Lambiotte, Etienne Lefebvre, +Journal of Statistical Mechanics: Theory and Experiment 2008 (10), P10008 (12pp) +``` +In the original algorithm each vertex examines the communities of its neighbors and makes a chooses a new community based on a function to maximize the calculated change in modularity. In the distributed version all vertices make this choice simultaneously rather than in serial order, updating the graph state after each change. Because choices are made in parallel some choice will be incorrect and will not maximize modularity values, however after repeated iterations community choices become more stable and we get results that closely mirror the serial algorithm. + +### running louvain + +After building the package (See above) you can execute the lovain algorithm against an edge list using the provided script + +``` +bin/louvain + +Usage: class com.soteradefense.dga.graphx.louvain.Main$ [options] [=....] + + -i | --input + input file or path Required. + -o | --output + output path Required + -m | --master + spark master, local[N] or spark://host:port default=local + -h | --sparkhome + SPARK_HOME Required to run on cluster + -n | --jobname + job name + -p | --parallelism + sets spark.default.parallelism and minSplits on the edge file. default=based on input partitions + -x | --minprogress + Number of vertices that must change communites for the algorithm to consider progress. default=2000 + -y | --progresscounter + Number of times the algorithm can fail to make progress before exiting. default=1 + -d | --edgedelimiter + specify input file edge delimiter. default="," + -j | --jars + comma seperated list of jars + -z | --ipaddress + Set to true to convert ipaddresses to Long ids. Defaults to false + =.... +``` + +To run a small local example execute: +``` +bin/louvain -i examples/small_edges.tsv -o test_output --edgedelimiter "\t" 2> stderr.txt +``` + +Spark produces alot of output, so sending stderr to a log file is recommended. Examine the test_output folder. you should see + +``` +test_output/ +├── level_0_edges +│   ├── _SUCCESS +│   └── part-00000 +├── level_0_vertices +│   ├── _SUCCESS +│   └── part-00000 +└── qvalues + ├── _SUCCESS + └── part-00000 +``` + +``` +cat test_output/level_0_vertices/part-00000 +(7,{community:8,communitySigmaTot:13,internalWeight:0,nodeWeight:3}) +(4,{community:4,communitySigmaTot:21,internalWeight:0,nodeWeight:4}) +(2,{community:4,communitySigmaTot:21,internalWeight:0,nodeWeight:4}) +(6,{community:8,communitySigmaTot:13,internalWeight:0,nodeWeight:4}) +(8,{community:8,communitySigmaTot:13,internalWeight:0,nodeWeight:3}) +(5,{community:4,communitySigmaTot:21,internalWeight:0,nodeWeight:4}) +(9,{community:8,communitySigmaTot:13,internalWeight:0,nodeWeight:3}) +(3,{community:4,communitySigmaTot:21,internalWeight:0,nodeWeight:4}) +(1,{community:4,communitySigmaTot:21,internalWeight:0,nodeWeight:5}) + +cat test_output/qvalues/part-00000 +(0,0.4134948096885813) +``` + +Note: the output is laid out as if you were in hdfs even when running local. For each level you see an edges directory and a vertices directory. The "level" refers to the number of times the graph has been "community compressed". At level 1 all of the level 0 vertices in community X are represented by a single vertex with the VertexID: X. For the small example all modulairyt was maximized with no community compression so only level 0 was computed. The vertices show the state of each vertex while the edges file specify the graph structure. The qvalues directory lists the modularity of the graph at each level of compression. For this example you should be able to see all of vertices splitting off into two distinct communities (community 4 and 8 ) with a final qvalue of ~ 0.413 + + +### running louvain on a cluster + +To run on a cluster be sure your input and output paths are of the form "hdfs:///path" and ensure you provide the --master and --sparkhome options. The --jars option is already set by the louvain script itself and need not be applied. + +### parallelism + +To change the level of parallelism use the -p or --parallelism option. If this option is not set parallelism will be based on the layout of the input data in HDFS. The number of partitions of the input file sets the level of parallelism. + +### advanced + +If you would like to include the louvain algorithm in your own compute pipeline or create a custom output format, etc you can easily do so by extending the com.soteradefense.dga.graphx.louvain.LouvainHarness class. See HDFSLouvainRunner which extends LouvainHarness and is called by Main for the example above + diff --git a/dga-graphx/bin/.DS_Store b/dga-graphx/bin/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/dga-graphx/bin/.DS_Store differ diff --git a/dga-graphx/bin/louvain b/dga-graphx/bin/louvain new file mode 100755 index 0000000..2bb0d5e --- /dev/null +++ b/dga-graphx/bin/louvain @@ -0,0 +1,7 @@ +#! /bin/bash + +T="$(date +%s)" +java -cp "build/dist/*" com.soteradefense.dga.graphx.louvain.Main --jars build/dist/dga-graphx-0.1.jar,build/dist/spark-graphx_2.10-0.9.0-cdh5.0.0-beta-2.jar "$@" + +T="$(($(date +%s)-T))" +echo "Time in seconds: ${T}" diff --git a/dga-graphx/build.gradle b/dga-graphx/build.gradle new file mode 100644 index 0000000..b3ffe18 --- /dev/null +++ b/dga-graphx/build.gradle @@ -0,0 +1,37 @@ +description = 'dga-graphx' + +apply plugin: 'scala' + +version = '0.1' + + + +repositories { + mavenLocal() + mavenCentral() + maven { + +url "https://repository.cloudera.com/artifactory/cloudera-repos" + } + +} + +dependencies { + compile 'org.scala-lang:scala-library:2.10.3' + compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '0.9.0-cdh5.0.0-beta-2' + compile group: 'org.apache.spark', name: 'spark-graphx_2.10', version: '0.9.0-cdh5.0.0-beta-2' + compile group: 'com.github.scopt', name: 'scopt_2.10', version: '3.2.0' + compile( + [group: 'org.slf4j', name: 'slf4j-api', version: '1.6.6'], + [group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.6.6'] + ) +} + + +task dist(dependsOn: 'assemble', type: Copy) { + from configurations.runtime + from jar + include "**/*.jar" + into "${buildDir}/dist" +} + diff --git a/dga-graphx/examples/.DS_Store b/dga-graphx/examples/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/dga-graphx/examples/.DS_Store differ diff --git a/dga-graphx/examples/convert.py b/dga-graphx/examples/convert.py new file mode 100644 index 0000000..e7c68b2 --- /dev/null +++ b/dga-graphx/examples/convert.py @@ -0,0 +1,32 @@ +import sys + +DEL = '\t' + +if __name__ == '__main__': + if len(sys.argv) != 3: + print 'usage: ',sys.argv[0],' ' + sys.exit(1) + + nodes = {} + curr = 0 + fobj = open(sys.argv[1],'r') + for line in fobj: + line = line.strip().split(DEL) + if line[0] not in nodes: + nodes[line[0]] = curr + curr +=1 + if line[1] not in nodes: + nodes[line[1]] = curr + curr += 1 + + fobj.close() + print 'Highest label give: ',curr + fobj = open(sys.argv[1],'r') + out = open(sys.argv[2],'w') + + for line in fobj: + line = line.strip().split(DEL) + out.write(str(nodes[line[0]])+DEL+str(nodes[line[1]])+'\n') + + fobj.close() + out.close() diff --git a/dga-graphx/examples/small_edges.tsv b/dga-graphx/examples/small_edges.tsv new file mode 100644 index 0000000..d931434 --- /dev/null +++ b/dga-graphx/examples/small_edges.tsv @@ -0,0 +1,17 @@ +1 2 +1 3 +1 4 +1 5 +1 6 +2 3 +2 4 +2 5 +3 4 +3 5 +4 5 +6 7 +6 8 +6 9 +7 8 +7 9 +8 9 \ No newline at end of file diff --git a/dga-graphx/examples/test.tsv b/dga-graphx/examples/test.tsv new file mode 100644 index 0000000..a3bef72 --- /dev/null +++ b/dga-graphx/examples/test.tsv @@ -0,0 +1,57 @@ +1 2 +1 3 +1 4 +1 5 +1 6 +2 3 +2 4 +2 5 +3 4 +3 5 +4 5 +6 7 +6 8 +6 9 +7 8 +7 9 +8 9 +11 12 +11 13 +11 14 +11 15 +11 16 +12 13 +12 14 +12 15 +13 14 +13 15 +14 15 +16 17 +16 18 +16 19 +17 18 +17 19 +18 19 +1 11 +21 22 +21 23 +21 24 +21 25 +21 26 +22 23 +22 24 +22 25 +23 24 +23 25 +24 25 +26 27 +26 28 +26 29 +27 28 +27 29 +28 29 +1 21 +8 18 +8 28 +18 28 +11 21 diff --git a/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/HDFSLouvainRunner.scala b/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/HDFSLouvainRunner.scala new file mode 100644 index 0000000..6eda123 --- /dev/null +++ b/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/HDFSLouvainRunner.scala @@ -0,0 +1,29 @@ +package com.soteradefense.dga.graphx.louvain + +import org.apache.spark.SparkContext +import org.apache.spark.graphx._ +import scala.Array.canBuildFrom + +/** + * Execute the louvain algorithim and save the vertices and edges in hdfs at each level. + * Can also save locally if in local mode. + * + * See LouvainHarness for algorithm details + */ +class HDFSLouvainRunner(minProgress:Int,progressCounter:Int,outputdir:String) extends LouvainHarness(minProgress:Int,progressCounter:Int){ + + var qValues = Array[(Int,Double)]() + + override def saveLevel(sc:SparkContext,level:Int,q:Double,graph:Graph[VertexState,Long]) = { + graph.vertices.saveAsTextFile(outputdir+"/level_"+level+"_vertices") + graph.edges.saveAsTextFile(outputdir+"/level_"+level+"_edges") + //graph.vertices.map( {case (id,v) => ""+id+","+v.internalWeight+","+v.community }).saveAsTextFile(outputdir+"/level_"+level+"_vertices") + //graph.edges.mapValues({case e=>""+e.srcId+","+e.dstId+","+e.attr}).saveAsTextFile(outputdir+"/level_"+level+"_edges") + qValues = qValues :+ ((level,q)) + println(s"qValue: $q") + + // overwrite the q values at each level + sc.parallelize(qValues, 1).saveAsTextFile(outputdir+"/qvalues") + } + +} \ No newline at end of file diff --git a/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/IpAddress.scala b/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/IpAddress.scala new file mode 100644 index 0000000..fc8c344 --- /dev/null +++ b/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/IpAddress.scala @@ -0,0 +1,37 @@ +package com.soteradefense.dga.graphx.louvain + +import java.net.InetAddress +import java.nio.ByteBuffer + +object IpAddress { + + def toString(address: Long) = { + val byteBuffer = ByteBuffer.allocate(8) + val addressBytes = byteBuffer.putLong(address) + // The below is needed because we don't have an unsigned Long, and passing a byte array + // with more than 4 bytes causes InetAddress to interpret it as a (bad) IPv6 address + val tmp = new Array[Byte](4) + Array.copy(addressBytes.array, 4, tmp, 0, 4) + InetAddress.getByAddress(tmp).getHostAddress() + } + + + def toLong(_address: String): Long = { + val address = try { + InetAddress.getByName(_address) + } catch { + case e: Throwable => throw new IllegalArgumentException("Could not parse address: " + e.getMessage) + } + val addressBytes = address.getAddress + val bb = ByteBuffer.allocate(8) + addressBytes.length match { + case 4 => + bb.put(Array[Byte](0,0,0,0)) // Need a filler + bb.put(addressBytes) + case n => + throw new IndexOutOfBoundsException("Expected 4 byte address, got " + n) + } + bb.getLong(0) + } + +} \ No newline at end of file diff --git a/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/LouvainCore.scala b/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/LouvainCore.scala new file mode 100644 index 0000000..286f9db --- /dev/null +++ b/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/LouvainCore.scala @@ -0,0 +1,337 @@ +package com.soteradefense.dga.graphx.louvain + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import scala.reflect.ClassTag +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.graphx.Graph.graphToGraphOps +import scala.math.BigDecimal.double2bigDecimal + + + +/** + * Provides low level louvain community detection algorithm functions. Generally used by LouvainHarness + * to coordinate the correct execution of the algorithm though its several stages. + * + * For details on the sequential algorithm see: Fast unfolding of communities in large networks, Blondel 2008 + */ +object LouvainCore { + + + + /** + * Generates a new graph of type Graph[VertexState,Long] based on an input graph of type. + * Graph[VD,Long]. The resulting graph can be used for louvain computation. + * + */ + def createLouvainGraph[VD: ClassTag](graph: Graph[VD,Long]) : Graph[VertexState,Long]= { + // Create the initial Louvain graph. + val nodeWeightMapFunc = (e:EdgeTriplet[VD,Long]) => Iterator((e.srcId,e.attr), (e.dstId,e.attr)) + val nodeWeightReduceFunc = (e1:Long,e2:Long) => e1+e2 + val nodeWeights = graph.mapReduceTriplets(nodeWeightMapFunc,nodeWeightReduceFunc) + + val louvainGraph = graph.outerJoinVertices(nodeWeights)((vid,data,weightOption)=> { + val weight = weightOption.getOrElse(0L) + val state = new VertexState() + state.community = vid + state.changed = false + state.communitySigmaTot = weight + state.internalWeight = 0L + state.nodeWeight = weight + state + }).partitionBy(PartitionStrategy.EdgePartition2D).groupEdges(_+_) + + return louvainGraph + } + + + + /** + * Transform a graph from [VD,Long] to a a [VertexState,Long] graph and label each vertex with a community + * to maximize global modularity (without compressing the graph) + */ + def louvainFromStandardGraph[VD: ClassTag](sc:SparkContext,graph:Graph[VD,Long], minProgress:Int=1,progressCounter:Int=1) : (Double,Graph[VertexState,Long],Int) = { + val louvainGraph = createLouvainGraph(graph) + return louvain(sc,louvainGraph,minProgress,progressCounter) + } + + + + /** + * For a graph of type Graph[VertexState,Long] label each vertex with a community to maximize global modularity. + * (without compressing the graph) + */ + def louvain(sc:SparkContext, graph:Graph[VertexState,Long], minProgress:Int=1,progressCounter:Int=1) : (Double,Graph[VertexState,Long],Int)= { + var louvainGraph = graph.cache() + val graphWeight = louvainGraph.vertices.values.map(vdata=> vdata.internalWeight+vdata.nodeWeight).reduce(_+_) + var totalGraphWeight = sc.broadcast(graphWeight) + println("totalEdgeWeight: "+totalGraphWeight.value) + + // gather community information from each vertex's local neighborhood + var msgRDD = louvainGraph.mapReduceTriplets(sendMsg,mergeMsg) + var activeMessages = msgRDD.count() //materializes the msgRDD and caches it in memory + + var updated = 0L - minProgress + var even = false + var count = 0 + val maxIter = 100000 + var stop = 0 + var updatedLastPhase = 0L + do { + count += 1 + even = ! even + + // label each vertex with its best community based on neighboring community information + val labeledVerts = louvainVertJoin(louvainGraph,msgRDD,totalGraphWeight,even).cache() + + // calculate new sigma total value for each community (total weight of each community) + val communtiyUpdate = labeledVerts + .map( {case (vid,vdata) => (vdata.community,vdata.nodeWeight+vdata.internalWeight)}) + .reduceByKey(_+_).cache() + + // map each vertex ID to its updated community information + val communityMapping = labeledVerts + .map( {case (vid,vdata) => (vdata.community,vid)}) + .join(communtiyUpdate) + .map({case (community,(vid,sigmaTot)) => (vid,(community,sigmaTot)) }) + .cache() + + // join the community labeled vertices with the updated community info + val updatedVerts = labeledVerts.join(communityMapping).map({ case (vid,(vdata,communityTuple) ) => + vdata.community = communityTuple._1 + vdata.communitySigmaTot = communityTuple._2 + (vid,vdata) + }).cache() + updatedVerts.count() + labeledVerts.unpersist(blocking = false) + communtiyUpdate.unpersist(blocking=false) + communityMapping.unpersist(blocking=false) + + val prevG = louvainGraph + louvainGraph = louvainGraph.outerJoinVertices(updatedVerts)((vid, old, newOpt) => newOpt.getOrElse(old)) + louvainGraph.cache() + + // gather community information from each vertex's local neighborhood + val oldMsgs = msgRDD + msgRDD = louvainGraph.mapReduceTriplets(sendMsg, mergeMsg).cache() + activeMessages = msgRDD.count() // materializes the graph by forcing computation + + oldMsgs.unpersist(blocking=false) + updatedVerts.unpersist(blocking=false) + prevG.unpersistVertices(blocking=false) + + // half of the communites can swtich on even cycles + // and the other half on odd cycles (to prevent deadlocks) + // so we only want to look for progess on odd cycles (after all vertcies have had a chance to move) + if (even) updated = 0 + updated = updated + louvainGraph.vertices.filter(_._2.changed).count + if (!even) { + println(" # vertices moved: "+java.text.NumberFormat.getInstance().format(updated)) + if (updated >= updatedLastPhase - minProgress) stop += 1 + updatedLastPhase = updated + } + + + } while ( stop <= progressCounter && (even || (updated > 0 && count < maxIter))) + println("\nCompleted in "+count+" cycles") + + + // Use each vertex's neighboring community data to calculate the global modularity of the graph + val newVerts = louvainGraph.vertices.innerJoin(msgRDD)((vid,vdata,msgs)=> { + // sum the nodes internal weight and all of its edges that are in its community + val community = vdata.community + var k_i_in = vdata.internalWeight + var sigmaTot = vdata.communitySigmaTot.toDouble + msgs.foreach({ case( (communityId,sigmaTotal),communityEdgeWeight ) => + if (vdata.community == communityId) k_i_in += communityEdgeWeight}) + val M = totalGraphWeight.value + val k_i = vdata.nodeWeight + vdata.internalWeight + var q = (k_i_in.toDouble / M) - ( ( sigmaTot *k_i) / math.pow(M, 2) ) + //println(s"vid: $vid community: $community $q = ($k_i_in / $M) - ( ($sigmaTot * $k_i) / math.pow($M, 2) )") + if (q < 0) 0 else q + }) + val actualQ = newVerts.values.reduce(_+_) + + // return the modularity value of the graph along with the + // graph. vertices are labeled with their community + return (actualQ,louvainGraph,count/2) + + } + + + /** + * Creates the messages passed between each vertex to convey neighborhood community data. + */ + private def sendMsg(et:EdgeTriplet[VertexState,Long]) = { + val m1 = (et.dstId,Map((et.srcAttr.community,et.srcAttr.communitySigmaTot)->et.attr)) + val m2 = (et.srcId,Map((et.dstAttr.community,et.dstAttr.communitySigmaTot)->et.attr)) + Iterator(m1, m2) + } + + + + /** + * Merge neighborhood community data into a single message for each vertex + */ + private def mergeMsg(m1:Map[(Long,Long),Long],m2:Map[(Long,Long),Long]) ={ + val newMap = scala.collection.mutable.HashMap[(Long,Long),Long]() + m1.foreach({case (k,v)=> + if (newMap.contains(k)) newMap(k) = newMap(k) + v + else newMap(k) = v + }) + m2.foreach({case (k,v)=> + if (newMap.contains(k)) newMap(k) = newMap(k) + v + else newMap(k) = v + }) + newMap.toMap + } + + + + /** + * Join vertices with community data form their neighborhood and select the best community for each vertex to maximize change in modularity. + * Returns a new set of vertices with the updated vertex state. + */ + private def louvainVertJoin(louvainGraph:Graph[VertexState,Long], msgRDD:VertexRDD[Map[(Long,Long),Long]], totalEdgeWeight:Broadcast[Long], even:Boolean) = { + louvainGraph.vertices.innerJoin(msgRDD)( (vid, vdata, msgs)=> { + var bestCommunity = vdata.community + var startingCommunityId = bestCommunity + var maxDeltaQ = BigDecimal(0.0); + var bestSigmaTot = 0L + msgs.foreach({ case( (communityId,sigmaTotal),communityEdgeWeight ) => + val deltaQ = q(startingCommunityId, communityId, sigmaTotal, communityEdgeWeight, vdata.nodeWeight, vdata.internalWeight,totalEdgeWeight.value) + //println(" communtiy: "+communityId+" sigma:"+sigmaTotal+" edgeweight:"+communityEdgeWeight+" q:"+deltaQ) + if (deltaQ > maxDeltaQ || (deltaQ > 0 && (deltaQ == maxDeltaQ && communityId > bestCommunity))){ + maxDeltaQ = deltaQ + bestCommunity = communityId + bestSigmaTot = sigmaTotal + } + }) + // only allow changes from low to high communties on even cyces and high to low on odd cycles + if ( vdata.community != bestCommunity && ( (even && vdata.community > bestCommunity) || (!even && vdata.community < bestCommunity) ) ){ + //println(" "+vid+" SWITCHED from "+vdata.community+" to "+bestCommunity) + vdata.community = bestCommunity + vdata.communitySigmaTot = bestSigmaTot + vdata.changed = true + } + else{ + vdata.changed = false + } + vdata + }) + } + + + + /** + * Returns the change in modularity that would result from a vertex moving to a specified community. + */ + private def q(currCommunityId:Long, testCommunityId:Long, testSigmaTot:Long, edgeWeightInCommunity:Long, nodeWeight:Long, internalWeight:Long, totalEdgeWeight:Long) : BigDecimal = { + val isCurrentCommunity = (currCommunityId.equals(testCommunityId)); + val M = BigDecimal(totalEdgeWeight); + val k_i_in_L = if (isCurrentCommunity) edgeWeightInCommunity + internalWeight else edgeWeightInCommunity; + val k_i_in = BigDecimal(k_i_in_L); + val k_i = BigDecimal(nodeWeight + internalWeight); + val sigma_tot = if (isCurrentCommunity) BigDecimal(testSigmaTot) - k_i else BigDecimal(testSigmaTot); + + var deltaQ = BigDecimal(0.0); + if (!(isCurrentCommunity && sigma_tot.equals(0.0))) { + deltaQ = k_i_in - ( k_i * sigma_tot / M) + //println(s" $deltaQ = $k_i_in - ( $k_i * $sigma_tot / $M") + } + return deltaQ; + } + + + + /** + * Compress a graph by its communities, aggregate both internal node weights and edge + * weights within communities. + */ + def compressGraph(graph:Graph[VertexState,Long],debug:Boolean=true) : Graph[VertexState,Long] = { + + // aggregate the edge weights of self loops. edges with both src and dst in the same community. + // WARNING can not use graph.mapReduceTriplets because we are mapping to new vertexIds + val internalEdgeWeights = graph.triplets.flatMap(et=>{ + if (et.srcAttr.community == et.dstAttr.community){ + Iterator( ( et.srcAttr.community, 2*et.attr) ) // count the weight from both nodes // count the weight from both nodes + } + else Iterator.empty + }).reduceByKey(_+_) + + + // aggregate the internal weights of all nodes in each community + var internalWeights = graph.vertices.values.map(vdata=> (vdata.community,vdata.internalWeight)).reduceByKey(_+_) + + // join internal weights and self edges to find new interal weight of each community + val newVerts = internalWeights.leftOuterJoin(internalEdgeWeights).map({case (vid,(weight1,weight2Option)) => + val weight2 = weight2Option.getOrElse(0L) + val state = new VertexState() + state.community = vid + state.changed = false + state.communitySigmaTot = 0L + state.internalWeight = weight1+weight2 + state.nodeWeight = 0L + (vid,state) + }).cache() + + + // translate each vertex edge to a community edge + val edges = graph.triplets.flatMap(et=> { + val src = math.min(et.srcAttr.community,et.dstAttr.community) + val dst = math.max(et.srcAttr.community,et.dstAttr.community) + if (src != dst) Iterator(new Edge(src, dst, et.attr)) + else Iterator.empty + }).cache() + + + // generate a new graph where each community of the previous + // graph is now represented as a single vertex + val compressedGraph = Graph(newVerts,edges) + .partitionBy(PartitionStrategy.EdgePartition2D).groupEdges(_+_) + + // calculate the weighted degree of each node + val nodeWeightMapFunc = (e:EdgeTriplet[VertexState,Long]) => Iterator((e.srcId,e.attr), (e.dstId,e.attr)) + val nodeWeightReduceFunc = (e1:Long,e2:Long) => e1+e2 + val nodeWeights = compressedGraph.mapReduceTriplets(nodeWeightMapFunc,nodeWeightReduceFunc) + + // fill in the weighted degree of each node + // val louvainGraph = compressedGraph.joinVertices(nodeWeights)((vid,data,weight)=> { + val louvainGraph = compressedGraph.outerJoinVertices(nodeWeights)((vid,data,weightOption)=> { + val weight = weightOption.getOrElse(0L) + data.communitySigmaTot = weight +data.internalWeight + data.nodeWeight = weight + data + }).cache() + louvainGraph.vertices.count() + louvainGraph.triplets.count() // materialize the graph + + newVerts.unpersist(blocking=false) + edges.unpersist(blocking=false) + return louvainGraph + + + + } + + + + + // debug printing + private def printlouvain(graph:Graph[VertexState,Long]) = { + print("\ncommunity label snapshot\n(vid,community,sigmaTot)\n") + graph.vertices.mapValues((vid,vdata)=> (vdata.community,vdata.communitySigmaTot)).collect().foreach(f=>println(" "+f)) + } + + + + // debug printing + private def printedgetriplets(graph:Graph[VertexState,Long]) = { + print("\ncommunity label snapshot FROM TRIPLETS\n(vid,community,sigmaTot)\n") + (graph.triplets.flatMap(e=> Iterator((e.srcId,e.srcAttr.community,e.srcAttr.communitySigmaTot), (e.dstId,e.dstAttr.community,e.dstAttr.communitySigmaTot))).collect()).foreach(f=>println(" "+f)) + } + + + +} \ No newline at end of file diff --git a/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/LouvainHarness.scala b/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/LouvainHarness.scala new file mode 100644 index 0000000..3e40895 --- /dev/null +++ b/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/LouvainHarness.scala @@ -0,0 +1,94 @@ +package com.soteradefense.dga.graphx.louvain + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import scala.reflect.ClassTag +import org.apache.spark.Logging + +/** + * Coordinates execution of the louvain distributed community detection process on a graph. + * + * The input Graph must have an edge type of Long. + * + * All lower level algorithm functions are in LouvainCore, this class acts to + * coordinate calls into LouvainCore and check for convergence criteria + * + * Two hooks are provided to allow custom behavior + * -saveLevel override to save the graph (vertcies/edges) after each phase of the process + * -finalSave override to specify a final action / save when the algorithm has completed. (not nessicary if saving at each level) + * + * High Level algorithm description. + * + * Set up - Each vertex in the graph is assigned its own community. + * 1. Each vertex attempts to increase graph modularity by changing to a neighboring community, or reamining in its current community. + * 2. Repeat step 1 until progress is no longer made + * - progress is measured by looking at the decrease in the number of vertices that change their community on each pass. + * If the change in progress is < minProgress more than progressCounter times we exit this level. + * 3. -saveLevel, each vertex is now labeled with a community. + * 4. Compress the graph representing each community as a single node. + * 5. repeat steps 1-4 on the compressed graph. + * 6. repeat until modularity is no longer improved + * + * For details see: Fast unfolding of communities in large networks, Blondel 2008 + * + * + */ +class LouvainHarness(minProgress:Int,progressCounter:Int) { + + + def run[VD: ClassTag](sc:SparkContext,graph:Graph[VD,Long]) = { + + var louvainGraph = LouvainCore.createLouvainGraph(graph) + + var level = -1 // number of times the graph has been compressed + var q = -1.0 // current modularity value + var halt = false + do { + level += 1 + println(s"\nStarting Louvain level $level") + + // label each vertex with its best community choice at this level of compression + val (currentQ,currentGraph,passes) = LouvainCore.louvain(sc, louvainGraph,minProgress,progressCounter) + louvainGraph.unpersistVertices(blocking=false) + louvainGraph=currentGraph + + saveLevel(sc,level,currentQ,louvainGraph) + + // If modularity was increased by at least 0.001 compress the graph and repeat + // halt immediately if the community labeling took less than 3 passes + //println(s"if ($passes > 2 && $currentQ > $q + 0.001 )") + if (passes > 2 && currentQ > q + 0.001 ){ + q = currentQ + louvainGraph = LouvainCore.compressGraph(louvainGraph) + } + else { + halt = true + } + + }while ( !halt ) + finalSave(sc,level,q,louvainGraph) + } + + /** + * Save the graph at the given level of compression with community labels + * level 0 = no compression + * + * override to specify save behavior + */ + def saveLevel(sc:SparkContext,level:Int,q:Double,graph:Graph[VertexState,Long]) = { + + } + + /** + * Complete any final save actions required + * + * override to specify save behavior + */ + def finalSave(sc:SparkContext,level:Int,q:Double,graph:Graph[VertexState,Long]) = { + + } + + + +} \ No newline at end of file diff --git a/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/Main.scala b/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/Main.scala new file mode 100644 index 0000000..c80b650 --- /dev/null +++ b/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/Main.scala @@ -0,0 +1,124 @@ +package com.soteradefense.dga.graphx.louvain + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.graphx._ + + + +// specify command line options and their defaults +case class Config( + input:String = "", + output: String = "", + master:String="local", + appName:String="graphX analytic", + jars:String="", + sparkHome:String="", + parallelism:Int = -1, + edgedelimiter:String = ",", + minProgress:Int = 2000, + progressCounter:Int = 1, + ipaddress: Boolean = false, + properties:Seq[(String,String)]= Seq.empty[(String,String)] ) + + +/** + * Execute the louvain distributed community detection. + * Requires an edge file and output directory in hdfs (local files for local mode only) + */ +object Main { + + def main(args: Array[String]) { + + // Parse Command line options + val parser = new scopt.OptionParser[Config](this.getClass().toString()){ + opt[String]('i',"input") action {(x,c)=> c.copy(input=x)} text("input file or path Required.") + opt[String]('o',"output") action {(x,c)=> c.copy(output=x)} text("output path Required") + opt[String]('m',"master") action {(x,c)=> c.copy(master=x)} text("spark master, local[N] or spark://host:port default=local") + opt[String]('h',"sparkhome") action {(x,c)=> c.copy(sparkHome=x)} text("SPARK_HOME Required to run on cluster") + opt[String]('n',"jobname") action {(x,c)=> c.copy(appName=x)} text("job name") + opt[Int]('p',"parallelism") action {(x,c)=> c.copy(parallelism=x)} text("sets spark.default.parallelism and minSplits on the edge file. default=based on input partitions") + opt[Int]('x',"minprogress") action {(x,c)=> c.copy(minProgress=x)} text("Number of vertices that must change communites for the algorithm to consider progress. default=2000") + opt[Int]('y',"progresscounter") action {(x,c)=> c.copy(progressCounter=x)} text("Number of times the algorithm can fail to make progress before exiting. default=1") + opt[String]('d',"edgedelimiter") action {(x,c)=> c.copy(edgedelimiter=x)} text("specify input file edge delimiter. default=\",\"") + opt[String]('j',"jars") action {(x,c)=> c.copy(jars=x)} text("comma seperated list of jars") + opt[Boolean]('z',"ipaddress") action {(x,c)=> c.copy(ipaddress=x)} text("Set to true to convert ipaddresses to Long ids. Defaults to false") + arg[(String,String)]("=....") unbounded() optional() action {case((k,v),c)=> c.copy(properties = c.properties :+ (k,v)) } + } + var edgeFile, outputdir,master,jobname,jars,sparkhome ,edgedelimiter = "" + var properties:Seq[(String,String)]= Seq.empty[(String,String)] + var parallelism,minProgress,progressCounter = -1 + var ipaddress = false + parser.parse(args,Config()) map { + config => + edgeFile = config.input + outputdir = config.output + master = config.master + jobname = config.appName + jars = config.jars + sparkhome = config.sparkHome + properties = config.properties + parallelism = config.parallelism + edgedelimiter = config.edgedelimiter + minProgress = config.minProgress + progressCounter = config.progressCounter + ipaddress = config.ipaddress + if (edgeFile == "" || outputdir == "") { + println(parser.usage) + sys.exit(1) + } + } getOrElse{ + sys.exit(1) + } + + // set system properties + properties.foreach( {case (k,v)=> + println(s"System.setProperty($k, $v)") + System.setProperty(k, v) + }) + + // Create the spark context + var sc: SparkContext = null + if (master.indexOf("local") == 0 ){ + println(s"sparkcontext = new SparkContext($master,$jobname)") + sc = new SparkContext(master, jobname) + } + else{ + println(s"sparkcontext = new SparkContext($master,$jobname,$sparkhome,$jars)") + sc = new SparkContext(master,jobname,sparkhome,jars.split(",")) + } + + // read the input into a distributed edge list + val inputHashFunc = if (ipaddress) (id:String) => IpAddress.toLong(id) else (id:String) => id.toLong + var edgeRDD = sc.textFile(edgeFile).map(row=> { + val tokens = row.split(edgedelimiter).map(_.trim()) + tokens.length match { + case 2 => {new Edge(inputHashFunc(tokens(0)),inputHashFunc(tokens(1)),1L) } + case 3 => {new Edge(inputHashFunc(tokens(0)),inputHashFunc(tokens(1)),tokens(2).toLong)} + case _ => {throw new IllegalArgumentException("invalid input line: "+row)} + } + }) + + // if the parallelism option was set map the input to the correct number of partitions, + // otherwise parallelism will be based off number of HDFS blocks + if (parallelism != -1 ) edgeRDD = edgeRDD.coalesce(parallelism,shuffle=true) + + // create the graph + val graph = Graph.fromEdges(edgeRDD, None) + + // use a helper class to execute the louvain + // algorithm and save the output. + // to change the outputs you can extend LouvainRunner.scala + val runner = new HDFSLouvainRunner(minProgress,progressCounter,outputdir) + runner.run(sc, graph) + + + } + + + +} + + + diff --git a/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/VertexState.scala b/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/VertexState.scala new file mode 100644 index 0000000..0bddd31 --- /dev/null +++ b/dga-graphx/src/main/scala/com/soteradefense/dga/graphx/louvain/VertexState.scala @@ -0,0 +1,19 @@ +package com.soteradefense.dga.graphx.louvain + +/** + * Louvain vertex state + * Contains all information needed for louvain community detection + */ +class VertexState extends Serializable{ + + var community = -1L + var communitySigmaTot = 0L + var internalWeight = 0L // self edges + var nodeWeight = 0L; //out degree + var changed = false + + override def toString(): String = { + "{community:"+community+",communitySigmaTot:"+communitySigmaTot+ + ",internalWeight:"+internalWeight+",nodeWeight:"+nodeWeight+"}" + } +} \ No newline at end of file diff --git a/dga-graphx/src/test/scala/com/soteradefense/dga/PLACEHOLDER.md b/dga-graphx/src/test/scala/com/soteradefense/dga/PLACEHOLDER.md new file mode 100644 index 0000000..e69de29