Skip to content

Commit

Permalink
fixed checkstyle issues
Browse files Browse the repository at this point in the history
  • Loading branch information
dorx committed Jun 9, 2014
1 parent 9ee94ee commit 1d413ce
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

val combOp = (r1: Result, r2: Result) => {
//take union of both key sets in case one partion doesn't contain all keys
// take union of both key sets in case one partion doesn't contain all keys
val keyUnion = r1.resultMap.keys.toSet.union(r2.resultMap.keys.toSet)

//Use r2 to keep the combined result since r1 is usual empty
// Use r2 to keep the combined result since r1 is usual empty
for (key <- keyUnion) {
val entry1 = r1.resultMap.get(key)
val entry2 = r2.resultMap.get(key)
Expand All @@ -286,7 +286,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

val zeroU = new Result(Map[K, Stratum]())

//determine threshold for each stratum and resample
// determine threshold for each stratum and resample
val finalResult = self.aggregateWithContext(zeroU)(seqOp, combOp).resultMap
val thresholdByKey = new mutable.HashMap[K, Double]()
for ((key, stratum) <- finalResult) {
Expand Down Expand Up @@ -330,7 +330,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// Bernoulli sampler
self.mapPartitionsWithIndex((idx: Int, iter: Iterator[(K, V)]) => {
val random = new RandomDataGenerator
random.reSeed(seed+idx)
random.reSeed(seed + idx)
iter.filter(t => random.nextUniform(0.0, 1.0) < thresholdByKey.get(t._1).get)
}, preservesPartitioning = true)
}
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -884,15 +884,18 @@ abstract class RDD[T: ClassTag](
* A version of {@link #aggregate()} that passes the TaskContext to the function that does
* aggregation for each partition.
*/
def aggregateWithContext[U: ClassTag](zeroValue: U)(seqOp: ((TaskContext, U), T) => U, combOp: (U, U) => U): U = {
def aggregateWithContext[U: ClassTag](zeroValue: U)(seqOp: ((TaskContext, U), T) => U,
combOp: (U, U) => U): U = {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
//pad seqOp and combOp with taskContext to conform to aggregate's signature in TraversableOnce
// pad seqOp and combOp with taskContext to conform to aggregate's signature in TraversableOnce
val paddedSeqOp = (arg1: (TaskContext, U), item: T) => (arg1._1, seqOp(arg1, item))
val paddedcombOp = (arg1 : (TaskContext, U), arg2: (TaskContext, U)) => (arg1._1, combOp(arg1._2, arg1._2))
val paddedcombOp = (arg1 : (TaskContext, U), arg2: (TaskContext, U)) =>
(arg1._1, combOp(arg1._2, arg1._2))
val cleanSeqOp = sc.clean(paddedSeqOp)
val cleanCombOp = sc.clean(paddedcombOp)
val aggregatePartition = (tc: TaskContext, it: Iterator[T]) => (it.aggregate(tc, zeroValue)(cleanSeqOp, cleanCombOp))._2
val aggregatePartition = (tc: TaskContext, it: Iterator[T]) =>
(it.aggregate(tc, zeroValue)(cleanSeqOp, cleanCombOp))._2
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
Expand Down

0 comments on commit 1d413ce

Please sign in to comment.