Skip to content

Commit

Permalink
GraphX benchmarking changes for VLDB
Browse files Browse the repository at this point in the history
1. In-memory shuffle
2. Checking Iterator.hasNext
3. In-place/destructive updates in vertex replication
4. WikiPipelineBenchmark
5. K-Core
6. Dataflow (naive Spark) PageRank

Squashed commit of the following:

commit ad6590c
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Mon Apr 21 17:56:24 2014 -0700

    Add unpersist option to Pregel

commit 3bcaa2f
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Wed Apr 30 05:58:11 2014 +0000

    Added faster version of kcore and fixed compile issue with dataflow pagerank

commit 03f5d76
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Wed Apr 30 02:22:13 2014 +0000

    Added more connected components.

commit cfd209b
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Wed Apr 30 00:01:24 2014 +0000

    Initial CC dataflow implementation.

commit 255db45
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Tue Apr 29 19:38:26 2014 +0000

    Updated pregel logging.

commit 5032924
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Tue Apr 29 08:52:07 2014 +0000

    Added dataflow pagerank. Having issues with it and in-memory shuffle.

commit f483ca4
Merge: 8d22500 9e59642
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Tue Apr 29 00:05:49 2014 -0700

    Merge pull request apache#3 from dcrankshaw/osdi_with_kcore_for_merge

    Osdi with kcore for merge

commit 9e59642
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Mon Apr 28 20:51:24 2014 +0000

    Added more logging.

    Conflicts:
    	graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala

commit 76a6a54
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Sun Apr 13 03:00:43 2014 +0000

    Fixed kcore. now works.

commit 1e924df
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Sun Apr 13 00:06:40 2014 +0000

    Changed kcore filenames.

commit a641dc1
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Sat Apr 12 01:59:17 2014 +0000

    Kcore debugging.

commit dbe5180
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Fri Apr 11 22:49:38 2014 +0000

    Fixed results of cherry-pick

commit 4ddc552
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Tue Jan 7 13:44:46 2014 -0800

    Added kcore implementation.

    Conflicts:
    	graph/src/main/scala/org/apache/spark/graph/Pregel.scala

commit 250199a
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Mon Apr 28 22:23:49 2014 +0000

    Minor logging changes.

commit 8d22500
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Fri Apr 25 16:34:53 2014 -0700

    EdgePartition.size should be val

commit bb36cc8
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Fri Apr 25 03:08:27 2014 -0700

    Set locality wait

commit ad4c874
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Sat Apr 19 20:08:57 2014 -0700

    In GraphLoader, coalesce to minEdgePartitions

commit 2898126
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Sat Apr 19 20:10:36 2014 -0700

    In Analytics, take PageRank numIter

commit 719b04a
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Sat Apr 19 20:09:24 2014 -0700

    Log current Pregel iteration

commit 8609184
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Fri Mar 28 15:15:44 2014 -0700

    Clean up WikiPipelineBenchmark

commit e8be08e
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Fri Mar 28 15:14:50 2014 -0700

    for-loop to while-loop in WikiArticle

commit 540c267
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Fri Mar 28 15:13:56 2014 -0700

    Make mutable RDDs optional

commit e27da44
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Fri Mar 28 15:12:58 2014 -0700

    Check for empty iterators

commit 5ec645d
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Fri Mar 28 15:10:41 2014 -0700

    In memory shuffle (cherry-picked from amplab/graphx#135)

commit f36e576
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Fri Mar 28 15:09:23 2014 -0700

    Log warning on partition recompute

commit c289e91
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Fri Mar 28 15:08:06 2014 -0700

    Untrack conf files

commit a61be7a
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Tue Mar 25 07:03:41 2014 +0000

    More timing logging and fixing the iterator issue.

commit 0645183
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Mon Mar 24 21:34:00 2014 +0000

    Fixed IO bugs.

commit a13f3b9
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Mon Mar 24 19:52:46 2014 +0000

    Fixed compile errors

commit e6fc93b
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Mon Mar 24 03:29:19 2014 +0000

    Updated pipeline benchmark to handle other systems.

commit 0a04cb9
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Sun Mar 23 20:57:08 2014 +0000

    Renamed prepostprocess

commit a262b07
Merge: 470a950 3342751
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Fri Mar 21 11:39:02 2014 -0700

    Merge remote-tracking branch 'dcrankshaw-incubator-spark/gx-vldb-bench' into vldb

    Conflicts:
    	project/SparkBuild.scala

commit 470a950
Merge: e09139d 3407a8f
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Fri Mar 21 11:30:12 2014 -0700

    Merge branch 'mutable-rdd' into vldb

commit 3342751
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Sun Mar 2 02:14:46 2014 +0000

    Fixed partitioning issues.

commit 3407a8f
Author: Ankur Dave <ankurdave@gmail.com>
Date:   Sat Mar 1 16:39:09 2014 -0800

    Try mutating old RDDs for delta updates

commit fad630f
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Wed Feb 26 02:33:01 2014 +0000

    more debugging. Seems like we have empty partitions.

commit b788bfb
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Tue Feb 25 11:10:16 2014 +0000

    added more debug logging.

commit 75b2da7
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Tue Feb 25 09:47:32 2014 +0000

    Fixed pipeline. Now seems to work well.

commit 7799b3e
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Mon Feb 24 22:31:47 2014 +0000

    updated slaves and conf.

commit 95e19ea
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Sun Feb 23 06:19:21 2014 +0000

    Readadded prepost

commit da93886
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Sun Feb 23 06:13:24 2014 +0000

    Stringify still not working. FML.

commit 6a4e492
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Sun Feb 23 03:34:00 2014 +0000

    Added debugging. Issue with stringify.

commit 99d2713
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Sat Feb 22 04:40:45 2014 +0000

    Added new conf

commit 7a036cb
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Fri Feb 21 20:35:37 2014 -0800

    Fixed compilation issues.

commit d3bbfd0
Author: Dan Crankshaw <dscrankshaw@gmail.com>
Date:   Wed Feb 19 06:15:14 2014 +0000

    Beginning of new pipeline, not compiled yet.

commit 2983132
Author: root <root@ip-10-78-213-7.ec2.internal>
Date:   Wed Feb 19 01:23:15 2014 +0000

    Initial vldb benchmark commmit. Added slaves file.
  • Loading branch information
ankurdave committed Aug 19, 2014
1 parent e09139d commit faa4073
Show file tree
Hide file tree
Showing 26 changed files with 1,175 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDDCheckpointData
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import java.nio.ByteBuffer

private[spark] object ShuffleMapTask {

Expand Down Expand Up @@ -168,7 +169,11 @@ private[spark] class ShuffleMapTask(
var totalBytes = 0L
var totalTime = 0L
val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
// writer.commit()
val bytes = writer.commit()
if (bytes != null) {
blockManager.putBytes(writer.blockId, ByteBuffer.wrap(bytes), StorageLevel.MEMORY_ONLY_SER, tellMaster = false)
}
writer.close()
val size = writer.fileSegment().length
totalBytes += size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[spark] class BlockManager(

private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]

private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val memoryStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore = new DiskStore(this, diskBlockManager)

// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
Expand Down Expand Up @@ -293,7 +293,7 @@ private[spark] class BlockManager(
* never deletes (recent) items.
*/
def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
diskStore.getValues(blockId, serializer).orElse(
memoryStore.getValues(blockId, serializer).orElse(
sys.error("Block " + blockId + " not found on disk, though it should be"))
}

Expand All @@ -313,7 +313,7 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
diskStore.getBytes(blockId) match {
memoryStore.getBytes(blockId) match {
case Some(bytes) =>
Some(bytes)
case None =>
Expand Down Expand Up @@ -831,7 +831,7 @@ private[spark] class BlockManager(
if (info != null) info.synchronized {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
val removedFromDisk = false //diskStore.remove(blockId)
if (!removedFromMemory && !removedFromDisk) {
logWarning("Block " + blockId + " could not be removed as it was not found in either " +
"the disk or memory store")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import java.io.{FileOutputStream, File, OutputStream}
import java.io.{ByteArrayOutputStream, FileOutputStream, File, OutputStream}
import java.nio.channels.FileChannel

import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
Expand All @@ -44,7 +44,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
* Flush the partial writes and commit them as a single atomic block. Return the
* number of bytes written for this commit.
*/
def commit(): Long
def commit(): Array[Byte]

/**
* Reverts writes that haven't been flushed yet. Callers should invoke this function
Expand Down Expand Up @@ -106,7 +106,7 @@ private[spark] class DiskBlockObjectWriter(
/** The file channel, used for repositioning / truncating the file. */
private var channel: FileChannel = null
private var bs: OutputStream = null
private var fos: FileOutputStream = null
private var fos: ByteArrayOutputStream = null
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private val initialPosition = file.length()
Expand All @@ -115,9 +115,8 @@ private[spark] class DiskBlockObjectWriter(
private var _timeWriting = 0L

override def open(): BlockObjectWriter = {
fos = new FileOutputStream(file, true)
fos = new ByteArrayOutputStream()
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
lastValidPosition = initialPosition
bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
Expand All @@ -130,9 +129,6 @@ private[spark] class DiskBlockObjectWriter(
if (syncWrites) {
// Force outstanding writes to disk and track how long it takes
objOut.flush()
val start = System.nanoTime()
fos.getFD.sync()
_timeWriting += System.nanoTime() - start
}
objOut.close()

Expand All @@ -149,18 +145,18 @@ private[spark] class DiskBlockObjectWriter(

override def isOpen: Boolean = objOut != null

override def commit(): Long = {
override def commit(): Array[Byte] = {
if (initialized) {
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
val prevPos = lastValidPosition
lastValidPosition = channel.position()
lastValidPosition - prevPos
lastValidPosition = fos.size()
fos.toByteArray
} else {
// lastValidPosition is zero if stream is uninitialized
lastValidPosition
null
}
}

Expand All @@ -170,7 +166,7 @@ private[spark] class DiskBlockObjectWriter(
// truncate the file to the last valid position.
objOut.flush()
bs.flush()
channel.truncate(lastValidPosition)
throw new UnsupportedOperationException("Revert temporarily broken due to in memory shuffle code changes.")
}
}

Expand All @@ -182,7 +178,7 @@ private[spark] class DiskBlockObjectWriter(
}

override def fileSegment(): FileSegment = {
new FileSegment(file, initialPosition, bytesWritten)
new FileSegment(null, initialPosition, bytesWritten)
}

// Only valid if called after close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.LinkedHashMap
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.serializer.Serializer

/**
* Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
Expand Down Expand Up @@ -119,6 +120,14 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

/**
* A version of getValues that allows a custom serializer. This is used as part of the
* shuffle short-circuit code.
*/
def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
}

override def remove(blockId: BlockId): Boolean = {
entries.synchronized {
val entry = entries.remove(blockId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,17 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
}
})
}

def removeAllShuffleStuff() {
for (state <- shuffleStates.values;
group <- state.allFileGroups;
(mapId, _) <- group.mapIdToIndex.iterator;
reducerId <- 0 until group.files.length) {
val blockId = new ShuffleBlockId(group.shuffleId, mapId, reducerId)
blockManager.removeBlock(blockId, tellMaster = false)
}
shuffleStates.clear()
}
}

private[spark]
Expand All @@ -200,7 +211,7 @@ object ShuffleBlockManager {
* Stores the absolute index of each mapId in the files of this group. For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
*/
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()

/**
* Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.graphx
/**
* The direction of a directed edge relative to a vertex.
*/
class EdgeDirection private (private val name: String) extends Serializable {
class EdgeDirection (private val name: String) extends Serializable {
/**
* Reverse the direction of an edge. An in becomes out,
* out becomes in and both and either remain the same.
Expand Down
25 changes: 19 additions & 6 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ class EdgeRDD[@specialized ED: ClassTag](
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
val partIter = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
if (partIter.hasNext) {
partIter.next._2.iterator
} else {
Iterator.empty
}
}

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
Expand All @@ -69,8 +74,12 @@ class EdgeRDD[@specialized ED: ClassTag](
private[graphx] def mapEdgePartitions[ED2: ClassTag](
f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
} else {
Iterator.empty
}
}, preservesPartitioning = true))
}

Expand Down Expand Up @@ -107,9 +116,13 @@ class EdgeRDD[@specialized ED: ClassTag](
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
if (thisIter.hasNext && otherIter.hasNext) {
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
} else {
Iterator.empty
}
})
}

Expand Down
2 changes: 1 addition & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* }
* }}}
*/
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)], destructive: Boolean = false)
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@ package org.apache.spark.graphx
import com.esotericsoftware.kryo.Kryo

import org.apache.spark.graphx.impl._
// import org.apache.spark.examples.graphx._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.BoundedPriorityQueue
import scala.collection.mutable
import org.apache.hadoop.io.{LongWritable, Text}
import java.util.{HashSet => JHashSet, TreeSet => JTreeSet}
// import org.apache.hadoop.conf.Configuration
import org.apache.mahout.text.wikipedia._

/**
* Registers GraphX classes with Kryo for improved performance.
Expand All @@ -41,6 +47,13 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[PartitionStrategy])
kryo.register(classOf[BoundedPriorityQueue[Object]])
kryo.register(classOf[EdgeDirection])
kryo.register(classOf[mutable.HashSet[VertexId]])
kryo.register(classOf[XmlInputFormat])
kryo.register(classOf[LongWritable])
kryo.register(classOf[Text])
kryo.register(classOf[WikiArticle])
// kryo.register(classOf[JHashSet[VertexId]])
kryo.register(classOf[JTreeSet[VertexId]])

// This avoids a large number of hash table lookups.
kryo.setReferences(false)
Expand Down
19 changes: 18 additions & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.graphx

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
import org.apache.spark.rdd.RDD

/**
* Provides utilities for loading [[Graph]]s from files.
Expand Down Expand Up @@ -60,7 +61,7 @@ object GraphLoader extends Logging {
val startTime = System.currentTimeMillis

// Parse the edge data table directly into edge partitions
val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
val edges = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
Expand All @@ -86,4 +87,20 @@ object GraphLoader extends Logging {
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
} // end of edgeListFile

def loadVertices(sc: SparkContext, vertexPath: String, delimiter: String = "\\s+"): RDD[(VertexId, String)] = {

val vertices = sc.textFile(vertexPath, 128).mapPartitions( iter =>
iter.filter(line => !line.isEmpty && line(0) != '#').map { line =>
val lineArray = line.split(delimiter)
if(lineArray.length < 2) {
println("Invalid line: " + line)
assert(false)
}
val id = lineArray(0).trim.toLong
val attr = lineArray.slice(1,lineArray.length).mkString(" ")
(id, attr)
})
vertices
}

}
Loading

0 comments on commit faa4073

Please sign in to comment.