From d1e053f3c7a95272676edfad485a31f69290effd Mon Sep 17 00:00:00 2001 From: luluorta Date: Mon, 4 Aug 2014 16:42:28 +0800 Subject: [PATCH 1/2] add max/min degree --- .../org/apache/spark/graphx/GraphOps.scala | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 02afaa987d40d..2cd253d5dd0d7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -62,6 +62,45 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali @transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees") + /** + * The max in-degree of all vertices in the graph. + */ + @transient lazy val maxInDegree: Int = + inDegrees.reduce((a, b) => if (a._2 > b._2) a else b)._2 + + /** + * The min in-degree of all vertices in the graph. + */ + @transient lazy val minInDegree: Int = + graph.vertices.leftJoin(inDegrees)((id, data, degree) => degree.getOrElse(0)) + .reduce((a, b) => if (a._2 < b._2) a else b)._2 + + /** + * The max out-degree of all vertices in the graph. + */ + @transient lazy val maxOutDegree: Int = + outDegrees.reduce((a, b) => if (a._2 > b._2) a else b)._2 + + /** + * The min out-degree of all vertices in the graph. + */ + @transient lazy val minOutDegree: Int = + graph.vertices.leftJoin(outDegrees)((id, data, degree) => degree.getOrElse(0)) + .reduce((a, b) => if (a._2 < b._2) a else b)._2 + + /** + * The max degree of all vertices in the graph. + */ + @transient lazy val maxDegree: Int = + degrees.reduce((a, b) => if (a._2 > b._2) a else b)._2 + + /** + * The min degree of all vertices in the graph. + */ + @transient lazy val minDegree: Int = + graph.vertices.leftJoin(degrees)((id, data, degree) => degree.getOrElse(0)) + .reduce((a, b) => if (a._2 < b._2) a else b)._2 + /** * Computes the neighboring vertex degrees. * From 1c35298bfd3bea5b8eeba6bb4804b3fe74ff7fd9 Mon Sep 17 00:00:00 2001 From: luluorta Date: Mon, 4 Aug 2014 16:56:29 +0800 Subject: [PATCH 2/2] add degree distribution --- .../org/apache/spark/graphx/GraphOps.scala | 43 ++++ .../graphx/util/FrequencyDistribution.scala | 188 ++++++++++++++++++ 2 files changed, 231 insertions(+) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/util/FrequencyDistribution.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 2cd253d5dd0d7..643dd195c4248 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -17,6 +17,8 @@ package org.apache.spark.graphx +import org.apache.spark.graphx.util.FrequencyDistribution + import scala.reflect.ClassTag import scala.util.Random @@ -101,6 +103,47 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali graph.vertices.leftJoin(degrees)((id, data, degree) => degree.getOrElse(0)) .reduce((a, b) => if (a._2 < b._2) a else b)._2 + /** + * The degree distribution of all vertices in the graph. + */ + def degreeDist(frequencyCounter: FrequencyDistribution): Array[((Int, Int), Long)] = { + getDegreeDist(EdgeDirection.Either, frequencyCounter) + } + + + /** + * The in-degree distribution of all vertices in the graph. + */ + def inDegreeDist(frequencyCounter: FrequencyDistribution): Array[((Int, Int), Long)] = { + getDegreeDist(EdgeDirection.In, frequencyCounter) + } + + /** + * The out-degree distribution of all vertices in the graph. + */ + def outDegreeDist(frequencyCounter: FrequencyDistribution): Array[((Int, Int), Long)] = { + getDegreeDist(EdgeDirection.Out, frequencyCounter) + } + + /** + * Computes the neighboring vertex degrees distribution. + */ + private def getDegreeDist(edgeDirection: EdgeDirection, frequencyCounter: FrequencyDistribution): Array[((Int, Int), Long)] = { + val (typedDegrees, max, min) = { + if (edgeDirection == EdgeDirection.In) { + (graph.vertices.leftJoin(inDegrees)((id, data, degree) => degree.getOrElse(0)), + maxInDegree, minInDegree) + } else if (edgeDirection == EdgeDirection.Out) { + (graph.vertices.leftJoin(outDegrees)((id, data, degree) => degree.getOrElse(0)), + maxOutDegree, minOutDegree) + } else { + (graph.vertices.leftJoin(degrees)((id, data, degree) => degree.getOrElse(0)), + maxDegree, minDegree) + } + } + frequencyCounter.compute(typedDegrees.values, max, min) + } + /** * Computes the neighboring vertex degrees. * diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/FrequencyDistribution.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/FrequencyDistribution.scala new file mode 100644 index 0000000000000..7b628118adc52 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/FrequencyDistribution.scala @@ -0,0 +1,188 @@ +package org.apache.spark.graphx.util + +import org.apache.spark.rdd.RDD + +import scala.collection.mutable.ArrayBuffer + +/** + * + */ +trait FrequencyDistribution extends Serializable { + protected var isSortedDisjointRanges: Boolean = false + + def compute(numbers: RDD[Int], max: Int, min: Int): Array[((Int, Int), Long)] = { + assert(max >= min) + val ranges = getRanges(max, min) + assert(ranges.forall(range => range._1 <= range._2)) + val frequencies = numbers.mapPartitions(getFrequencyFunc(ranges, max, min)).reduce((a, b) => a.zip(b).map(pair => pair._1 + pair._2)) + ranges.zip(frequencies) + } + + def compute(numbers: RDD[Int]): Array[((Int, Int), Long)] = { + val (max, min) = numbers.mapPartitions { iter => + var (max: Int, min: Int) = (0, 0) + if (iter.hasNext) { + max = iter.next() + min = max + } + while (iter.hasNext) { + val num = iter.next() + if (num > max) max = num + else if (num < min) min = num + } + Iterator(Tuple2(max, min)) + }.reduce { (a, b) => + val max = if (a._1 > b._1) a._1 else b._1 + val min = if (a._2 < b._2) a._2 else b._2 + (max, min) + } + compute(numbers, max, min) + } + + protected def getFrequencyFunc(ranges: Array[(Int, Int)], max: Int, min: Int): Iterator[Int] => Iterator[Array[Long]] = { + if (isSortedDisjointRanges) { + (iter: Iterator[Int]) => { + val frequencies = new Array[Long](ranges.length) + while (iter.hasNext) { + val num = iter.next() + val target = binaryRangeSearch(ranges, num) + if (target != -1) frequencies(target) += 1 + } + Iterator(frequencies) + } + } else { + (iter: Iterator[Int]) => { + val frequencies = new Array[Long](ranges.length) + while (iter.hasNext) { + val num = iter.next() + var i = 0 + ranges.foreach { range => + if (num >= range._1 && num <= range._2) frequencies(i) += 1 + i += 1 + } + } + Iterator(frequencies) + } + } + } + + private def binaryRangeSearch(ranges: Array[(Int, Int)], num: Int): Int = { + var (left, right) = (0, ranges.length - 1) + while (left <= right) { + val middle = (left + right) >>> 1 + if (num > ranges(middle)._2) left = middle + 1 + else if (num < ranges(middle)._1) right = middle - 1 + else return middle + } + -1 + } + + protected def getRanges(max: Int, min: Int): Array[(Int, Int)] +} + +object FrequencyDistribution { + + private case class UserDefinedRanges(ranges: Array[(Int, Int)], sortedDisjoint: Boolean = false) extends FrequencyDistribution { + isSortedDisjointRanges = sortedDisjoint + + def getRanges(max: Int, min: Int): Array[(Int, Int)] = ranges + } + + private case class DividedRanges(numRanges: Int) extends FrequencyDistribution { + isSortedDisjointRanges = true + + def getRanges(max: Int, min: Int): Array[(Int, Int)] = { + val span = max - min + 1 + val initSize = span / numRanges + val sizes = Array.fill(numRanges)(initSize) + val remainder = span - numRanges * initSize + for (i <- 0 until remainder) { + sizes(i) += 1 + } + assert(sizes.reduce(_ + _) == span) + val ranges = ArrayBuffer.empty[(Int, Int)] + var start = min + sizes.filter(_ > 0).foreach { size => + val end = start + size - 1 + ranges += Tuple2(start, end) + start = end + 1 + } + assert(start == max + 1) + ranges.toArray + } + } + + private case class StepRanges(stepSize: Int, startFrom: Option[Int] = None) extends FrequencyDistribution { + isSortedDisjointRanges = true + + def getRanges(max: Int, min: Int): Array[(Int, Int)] = { + var start = startFrom.getOrElse(min) + val ranges = ArrayBuffer.empty[(Int, Int)] + while (start <= max) { + val end = start + stepSize - 1 + ranges += Tuple2(start, end) + start = end + 1 + } + ranges.toArray + } + } + + private case class SlidingWindows(windowSize: Int, slidingSize: Int, startFrom: Option[Int] = None) extends FrequencyDistribution { + def getRanges(max: Int, min: Int): Array[(Int, Int)] = { + var start = startFrom.getOrElse(min) + val ranges = ArrayBuffer.empty[(Int, Int)] + while (start <= max) { + val end = start + windowSize - 1 + ranges += Tuple2(start, end) + start += slidingSize + } + ranges.toArray + } + } + + private case object Log10 extends FrequencyDistribution { + isSortedDisjointRanges = true + + def getRanges(max: Int, min: Int): Array[(Int, Int)] = { + val ranges = ArrayBuffer.empty[(Int, Int)] + ranges += Tuple2(0, 9) + var start = 10 + while (start <= max) { + val end = start * 10 - 1 + ranges += Tuple2(start, end) + start = end + 1 + } + ranges.toArray + } + } + + def apply(ranges: Array[(Int, Int)], sortedDisjoint: Boolean = false): FrequencyDistribution = { + new UserDefinedRanges(ranges, sortedDisjoint) + } + + def log10(): FrequencyDistribution = Log10 + + def divide(numRanges: Int): FrequencyDistribution = { + assert(numRanges > 0) + new DividedRanges(numRanges) + } + + def step(stepSize: Int): FrequencyDistribution = { + assert(stepSize > 0) + new StepRanges(stepSize) + } + + def step(stepSize: Int, startFrom: Int): FrequencyDistribution = { + assert(stepSize > 0) + new StepRanges(stepSize, Some(startFrom)) + } + + + def sliding(windowSize: Int, slidingSize: Int): FrequencyDistribution ={ + new SlidingWindows(windowSize, slidingSize) + } + + def sliding(windowSize: Int, slidingSize: Int, startFrom: Int): FrequencyDistribution ={ + new SlidingWindows(windowSize, slidingSize, Some(startFrom)) + } +}