Skip to content

Commit

Permalink
many new rdds
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams committed Jun 19, 2017
1 parent 9e6cc44 commit 6a23202
Show file tree
Hide file tree
Showing 30 changed files with 986 additions and 259 deletions.
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
addSbtPlugin("org.hammerlab" % "sbt-parent" % "1.7.2")
addSbtPlugin("org.hammerlab" % "sbt-parent" % "2.0.0-SNAPSHOT")
20 changes: 20 additions & 0 deletions src/main/scala/org/apache/spark/hadoop/Util.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.apache.spark.hadoop

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.InputMetrics

object Util {
def getFSBytesReadOnThreadCallback: Option[() => Long] =
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()

def setBytesRead(bytesRead: Long)(implicit inputMetrics: InputMetrics): Unit =
inputMetrics.setBytesRead(bytesRead)

def incRecordsRead(amount: Long = 1)(implicit inputMetrics: InputMetrics): Unit =
inputMetrics.incRecordsRead(amount)

def incBytesRead(amount: Long = 1)(implicit inputMetrics: InputMetrics): Unit =
inputMetrics.incBytesRead(amount)

val UPDATE_INPUT_METRICS_INTERVAL_RECORDS = SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS
}
20 changes: 20 additions & 0 deletions src/main/scala/org/hammerlab/collection/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.hammerlab

import scala.collection.generic.CanBuildFrom
import scala.collection.mutable

package object collection {

/**
* [[CanBuildFrom]] instance for constructing [[Array]]s, not provided in standard library.
*/
implicit def canBuildArray[From] =
new CanBuildFrom[From, String, Array[String]] {
override def apply(from: From): mutable.Builder[String, Array[String]] =
mutable.ArrayBuilder.make[String]

override def apply(): mutable.Builder[String, Array[String]] =
mutable.ArrayBuilder.make[String]
}

}
27 changes: 27 additions & 0 deletions src/main/scala/org/hammerlab/hadoop/FileSplit.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.hammerlab.hadoop

import org.apache.hadoop.mapreduce.lib.input

/**
* Case-class sugar over Hadoop [[input.FileSplit]]
*/
case class FileSplit(path: Path,
start: Long,
length: Long,
locations: Array[String])
extends input.FileSplit {
def end = start + length
}

object FileSplit {
def apply(split: input.FileSplit): FileSplit =
FileSplit(
split.getPath,
split.getStart,
split.getLength,
split.getLocations
)

implicit def conv(split: input.FileSplit): FileSplit =
apply(split)
}
77 changes: 77 additions & 0 deletions src/main/scala/org/hammerlab/hadoop/FileSplits.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.hammerlab.hadoop

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce
import org.apache.hadoop.mapreduce.lib.input
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat.{ SPLIT_MAXSIZE, setInputPaths }
import org.apache.hadoop.mapreduce.{ InputSplit, Job, TaskAttemptContext }

import scala.collection.JavaConverters._

case class MaxSplitSize(size: Long)

object MaxSplitSize {
implicit def makeMaxSplitSize(size: Long): MaxSplitSize = MaxSplitSize(size)
implicit def unmakeMaxSplitSize(size: MaxSplitSize): Long = size.size

val DEFAULT_MAX_SPLIT_SIZE = 32 * 1024 * 1024L

def apply(size: Option[Long] = None)(implicit conf: Configuration): MaxSplitSize =
MaxSplitSize(
size.getOrElse(
conf.getLong(
SPLIT_MAXSIZE,
DEFAULT_MAX_SPLIT_SIZE
)
)
)
}

object FileSplits {

trait Config {
def maxSplitSize: MaxSplitSize
}

object Config {
def apply(maxSplitSize: Long): Config = ConfigImpl(maxSplitSize)
def apply(maxSplitSize: Option[Long] = None)(implicit conf: Configuration): Config =
ConfigImpl(
MaxSplitSize(
maxSplitSize
)
)

implicit def default(implicit conf: Configuration) = apply()
}

private case class ConfigImpl(maxSplitSize: MaxSplitSize)
extends Config

def apply(path: Path,
conf: Configuration)(
implicit config: Config
): Seq[FileSplit] = {

val job = Job.getInstance(conf, s"$path:file-splits")

val jobConf = job.getConfiguration

jobConf.setLong(SPLIT_MAXSIZE, config.maxSplitSize)

setInputPaths(job, path)

val fif =
new input.FileInputFormat[Any, Any] {
// Hadoop API requires us to have a stub here, though it is not used
override def createRecordReader(split: InputSplit,
context: TaskAttemptContext): mapreduce.RecordReader[Any, Any] =
???
}

fif
.getSplits(job)
.asScala
.map(_.asInstanceOf[input.FileSplit]: FileSplit)
}
}
14 changes: 14 additions & 0 deletions src/main/scala/org/hammerlab/hadoop/Path.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.hammerlab.hadoop

import java.net.URI

import org.apache.hadoop.fs

case class Path(uri: URI) {
override def toString: String = uri.toString
}

object Path {
implicit def fromHadoopPath(path: fs.Path): Path = Path(path.toUri)
implicit def toHadoopPath(path: Path): fs.Path = new fs.Path(path.uri)
}
10 changes: 10 additions & 0 deletions src/main/scala/org/hammerlab/hadoop/Registrar.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.hammerlab.hadoop

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

object Registrar extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[SerializableConfiguration])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.hammerlab.hadoop

import java.io.{ ObjectInputStream, ObjectOutputStream }

import org.apache.hadoop.conf.Configuration
import org.apache.spark.broadcast.Broadcast

class SerializableConfiguration(@transient var value: Configuration)
extends Serializable {
private def writeObject(out: ObjectOutputStream): Unit = {
out.defaultWriteObject()
value.write(out)
}

private def readObject(in: ObjectInputStream): Unit = {
value = new Configuration(false)
value.readFields(in)
}
}

object SerializableConfiguration {
implicit def unwrapSerializableConfiguration(conf: SerializableConfiguration): Configuration = conf.value
implicit def unwrapSerializableConfigurationBroadcast(confBroadcast: Broadcast[SerializableConfiguration]): Configuration = confBroadcast.value.value

def apply(conf: Configuration): SerializableConfiguration =
new SerializableConfiguration(conf)

implicit class ConfWrapper(val conf: Configuration) extends AnyVal {
def serializable: SerializableConfiguration =
SerializableConfiguration(conf)
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package org.hammerlab.magic.hadoop
package org.hammerlab.hadoop

import java.io.IOException

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapred.{JobConf, SequenceFileInputFormat}
import org.apache.hadoop.fs.{ FileStatus, FileSystem, Path HPath }
import org.apache.hadoop.mapred.{ JobConf, SequenceFileInputFormat }

/**
* [[SequenceFileInputFormat]] guaranteed to be loaded in with the same splits it was written out with.
*/
class UnsplittableSequenceFileInputFormat[K, V] extends SequenceFileInputFormat[K, V] {
override def isSplitable(fs: FileSystem, filename: Path): Boolean = false
override def isSplitable(fs: FileSystem, filename: HPath): Boolean = false

/**
* Ensure that partitions are read back in in the same order they were written.
Expand Down
11 changes: 5 additions & 6 deletions src/main/scala/org/hammerlab/magic/rdd/RunLengthRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import org.hammerlab.magic.rdd.partitions.FilterPartitionIdxs._
import org.hammerlab.magic.rdd.sliding.BorrowElemsRDD._

import scala.reflect.ClassTag
import math.max

/**
* Helper for run-length encoding an [[RDD]].
*/
class RunLengthRDD[T: ClassTag](rdd: RDD[T]) {
case class RunLengthRDD[T: ClassTag](rdd: RDD[T]) {
lazy val runLengthEncode: RDD[(T, Long)] = {
val runLengthPartitions =
rdd.mapPartitions(_.runLengthEncode())
Expand All @@ -24,7 +25,7 @@ class RunLengthRDD[T: ClassTag](rdd: RDD[T]) {
val partitionOverrides =
(for {
range new RangeAccruingIterator(oneOrFewerElementPartitions.iterator)
sendTo = math.max(0, range.start - 1)
sendTo = max(0, range.start - 1)
i range
} yield
(i + 1) sendTo
Expand All @@ -34,15 +35,13 @@ class RunLengthRDD[T: ClassTag](rdd: RDD[T]) {
runLengthPartitions
.shiftLeft(
1,
partitionOverrides,
allowIncompletePartitions = true
partitionOverrides
)
.mapPartitions(
it
reencode(
it
.map(t t._1 t._2.toLong)
.buffered
)
)
}
Expand All @@ -54,5 +53,5 @@ object RunLengthRDD {
kryo.register(classOf[Array[Int]])
}

implicit def rddToRunLengthRDD[T: ClassTag](rdd: RDD[T]): RunLengthRDD[T] = new RunLengthRDD(rdd)
implicit def ooRunLengthRDD[T: ClassTag](rdd: RDD[T]): RunLengthRDD[T] = RunLengthRDD(rdd)
}
19 changes: 19 additions & 0 deletions src/main/scala/org/hammerlab/magic/rdd/keyed/FilterKeysRDD.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.hammerlab.magic.rdd.keyed

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD

import scala.reflect.ClassTag

case class FilterKeysRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) {
def filterKeys(setBroadcast: Broadcast[Set[K]]): RDD[(K, V)] =
rdd
.filter {
case (k, _)
setBroadcast.value(k)
}
}

object FilterKeysRDD {
implicit def makeFilterKeysRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): FilterKeysRDD[K, V] = FilterKeysRDD(rdd)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import scala.reflect.ClassTag
/**
* Adds [[maxByKey]] and [[minByKey]] helpers to an [[RDD]].
*/
class ReduceByKeyRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)])(implicit ord: Ordering[V]) {
case class ReduceByKeyRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)])(implicit ord: Ordering[V]) {
def maxByKey(numPartitions: Int = rdd.getNumPartitions): RDD[(K, V)] = rdd.reduceByKey((a, b) ord.max(a, b))
def minByKey(numPartitions: Int = rdd.getNumPartitions): RDD[(K, V)] = rdd.reduceByKey((a, b) ord.min(a, b))
}

object ReduceByKeyRDD {
implicit def rddToReduceByKeyRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)])(implicit ord: Ordering[V]): ReduceByKeyRDD[K, V] =
new ReduceByKeyRDD(rdd)
implicit def rddToReduceByKeyRDD[K: ClassTag, V: ClassTag : Ordering](rdd: RDD[(K, V)]): ReduceByKeyRDD[K, V] =
ReduceByKeyRDD(rdd)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.Random

class KeySamples[V](var num: Long, var vs: ArrayBuffer[V], max: Int) extends Serializable {
class KeySamples[V](var num: Long, var vs: ArrayBuffer[V], max: Int)
extends Serializable {

/**
* Private buffer in which we accumulate up to 2*max elements. Public accessor @values lazily samples this down to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.hammerlab.magic.rdd.partitions.SlicePartitionsRDD
import org.hammerlab.spark.PartitionIndex

import scala.collection.mutable
import scala.math.ceil
import scala.reflect.ClassTag

/**
Expand Down Expand Up @@ -67,7 +68,7 @@ case class SplitByKeyRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) {
(for {
(k, num) totalKeyCounts
} yield
k math.ceil(num.toDouble / elemsPerPartition).toInt
k ceil(num.toDouble / elemsPerPartition).toInt
)
.toVector
.unzip
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.hammerlab.magic.rdd.partitions

import org.apache.spark.{ OneToOneDependency, Partition, TaskContext }
import org.apache.spark.rdd.RDD

import scala.reflect.ClassTag

case class AppendEmptyPartitionRDD[T: ClassTag](rdd: RDD[T])
extends RDD[T](
rdd.sparkContext,
Seq(
new OneToOneDependency(rdd)
)
) {

/** [[RDD.partitions]] is transient, so we denormalize the number of partitions here */
val num = rdd.getNumPartitions

override def compute(split: Partition, context: TaskContext): Iterator[T] =
if (split.index < num)
rdd.compute(split, context)
else
Iterator()

override def getPartitions: Array[Partition] =
rdd.partitions :+
new Partition {
override def index: Int = num
}
}

case class AppendEmptyPartition[T: ClassTag](@transient rdd: RDD[T]) {
def appendEmptyPartition: AppendEmptyPartitionRDD[T] = AppendEmptyPartitionRDD(rdd)
}

object AppendEmptyPartitionRDD {
implicit def makeAppendEmptyPartitionRDD[T: ClassTag](rdd: RDD[T]): AppendEmptyPartition[T] = AppendEmptyPartition(rdd)
}
Loading

0 comments on commit 6a23202

Please sign in to comment.