Skip to content

Commit

Permalink
Change Java countByKey, countApproxDistinctByKey return types to use …
Browse files Browse the repository at this point in the history
…Java Long, not Scala; update similar methods for consistency on java.long.Long.valueOf with no API change
  • Loading branch information
srowen committed Jan 6, 2016
1 parent b2467b3 commit 293b5e4
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 42 deletions.
32 changes: 19 additions & 13 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.api.java

import java.{lang => jl}
import java.lang.{Iterable => JIterable}
import java.util.{Comparator, List => JList, Map => JMap}
import java.util.{Comparator, List => JList}

import scala.collection.JavaConverters._
import scala.language.implicitConversions
Expand Down Expand Up @@ -139,7 +140,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* math.ceil(numItems * samplingRate) over all key values.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
fractions: java.util.Map[K, Double],
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions.asScala, seed))

Expand All @@ -154,7 +155,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Use Utils.random.nextLong as the default seed for the random number generator.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double]): JavaPairRDD[K, V] =
fractions: java.util.Map[K, Double]): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, Utils.random.nextLong)

/**
Expand All @@ -168,7 +169,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* two additional passes.
*/
def sampleByKeyExact(withReplacement: Boolean,
fractions: JMap[K, Double],
fractions: java.util.Map[K, Double],
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions.asScala, seed))

Expand All @@ -184,7 +185,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*
* Use Utils.random.nextLong as the default seed for the random number generator.
*/
def sampleByKeyExact(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] =
def sampleByKeyExact(
withReplacement: Boolean,
fractions: java.util.Map[K, Double]): JavaPairRDD[K, V] =
sampleByKeyExact(withReplacement, fractions, Utils.random.nextLong)

/**
Expand Down Expand Up @@ -292,7 +295,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func))

/** Count the number of elements for each key, and return the result to the master as a Map. */
def countByKey(): java.util.Map[K, Long] = mapAsSerializableJavaMap(rdd.countByKey())
def countByKey(): java.util.Map[K, jl.Long] =
mapAsSerializableJavaMap(rdd.countByKey().mapValues(jl.Long.valueOf))

/**
* Approximate version of countByKey that can return a partial result if it does
Expand Down Expand Up @@ -934,9 +938,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* It must be greater than 0.000017.
* @param partitioner partitioner of the resulting RDD.
*/
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
{
fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner)
: JavaPairRDD[K, jl.Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)).
asInstanceOf[JavaPairRDD[K, jl.Long]]
}

/**
Expand All @@ -950,8 +955,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* It must be greater than 0.000017.
* @param numPartitions number of partitions of the resulting RDD.
*/
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, jl.Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)).
asInstanceOf[JavaPairRDD[K, jl.Long]]
}

/**
Expand All @@ -964,8 +970,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
*/
def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD))
def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, jl.Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD)).asInstanceOf[JavaPairRDD[K, jl.Long]]
}

/** Assign a name to this RDD */
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.api.java

import java.{lang => jl}
import java.lang.{Iterable => JIterable, Long => JLong}
import java.lang.{Iterable => JIterable}
import java.util.{Comparator, Iterator => JIterator, List => JList}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -305,8 +305,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
*/
def zipWithUniqueId(): JavaPairRDD[T, JLong] = {
JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]]
def zipWithUniqueId(): JavaPairRDD[T, jl.Long] = {
JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, jl.Long]]
}

/**
Expand All @@ -316,8 +316,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
* This method needs to trigger a spark job when this RDD contains more than one partitions.
*/
def zipWithIndex(): JavaPairRDD[T, JLong] = {
JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]]
def zipWithIndex(): JavaPairRDD[T, jl.Long] = {
JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, jl.Long]]
}

// Actions (launch a job to return a value to the user program)
Expand Down Expand Up @@ -448,7 +448,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
def countByValue(): java.util.Map[T, jl.Long] =
mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new jl.Long(x._2)))))
mapAsSerializableJavaMap(rdd.countByValue().mapValues(jl.Long.valueOf))

/**
* (Experimental) Approximate version of countByValue().
Expand Down Expand Up @@ -631,8 +631,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* The asynchronous version of `count`, which returns a
* future for counting the number of elements in this RDD.
*/
def countAsync(): JavaFutureAction[JLong] = {
new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), JLong.valueOf)
def countAsync(): JavaFutureAction[jl.Long] = {
new JavaFutureActionWrapper[Long, jl.Long](rdd.countAsync(), jl.Long.valueOf)
}

/**
Expand Down
18 changes: 9 additions & 9 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -1580,11 +1580,11 @@ public void countApproxDistinctByKey() {
}
double relativeSD = 0.001;
JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect();
for (Tuple2<Integer, Object> resItem : res) {
double count = (double)resItem._1();
Long resCount = (Long)resItem._2();
Double error = Math.abs((resCount - count) / count);
List<Tuple2<Integer, Long>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect();
for (Tuple2<Integer, Long> resItem : res) {
double count = resItem._1();
long resCount = resItem._2();
double error = Math.abs((resCount - count) / count);
Assert.assertTrue(error < 0.1);
}

Expand Down Expand Up @@ -1633,12 +1633,12 @@ public Tuple2<Integer, Integer> call(Integer i) {
fractions.put(0, 0.5);
fractions.put(1, 1.0);
JavaPairRDD<Integer, Integer> wr = rdd2.sampleByKey(true, fractions, 1L);
Map<Integer, Long> wrCounts = (Map<Integer, Long>) (Object) wr.countByKey();
Map<Integer, Long> wrCounts = wr.countByKey();
Assert.assertEquals(2, wrCounts.size());
Assert.assertTrue(wrCounts.get(0) > 0);
Assert.assertTrue(wrCounts.get(1) > 0);
JavaPairRDD<Integer, Integer> wor = rdd2.sampleByKey(false, fractions, 1L);
Map<Integer, Long> worCounts = (Map<Integer, Long>) (Object) wor.countByKey();
Map<Integer, Long> worCounts = wor.countByKey();
Assert.assertEquals(2, worCounts.size());
Assert.assertTrue(worCounts.get(0) > 0);
Assert.assertTrue(worCounts.get(1) > 0);
Expand All @@ -1659,12 +1659,12 @@ public Tuple2<Integer, Integer> call(Integer i) {
fractions.put(0, 0.5);
fractions.put(1, 1.0);
JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L);
Map<Integer, Long> wrExactCounts = (Map<Integer, Long>) (Object) wrExact.countByKey();
Map<Integer, Long> wrExactCounts = wrExact.countByKey();
Assert.assertEquals(2, wrExactCounts.size());
Assert.assertTrue(wrExactCounts.get(0) == 2);
Assert.assertTrue(wrExactCounts.get(1) == 4);
JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L);
Map<Integer, Long> worExactCounts = (Map<Integer, Long>) (Object) worExact.countByKey();
Map<Integer, Long> worExactCounts = worExact.countByKey();
Assert.assertEquals(2, worExactCounts.size());
Assert.assertTrue(worExactCounts.get(0) == 2);
Assert.assertTrue(worExactCounts.get(1) == 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.streaming.api.java

import java.lang.{Long => JLong}
import java.{lang => jl}
import java.util.{List => JList}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -50,8 +50,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T

def wrapRDD(in: RDD[T]): R

implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
in.map(new JLong(_))
implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[jl.Long] = {
in.map(jl.Long.valueOf)
}

/**
Expand All @@ -74,14 +74,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
def count(): JavaDStream[JLong] = dstream.count()
def count(): JavaDStream[jl.Long] = dstream.count()

/**
* Return a new DStream in which each RDD contains the counts of each distinct value in
* each RDD of this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
*/
def countByValue(): JavaPairDStream[T, JLong] = {
def countByValue(): JavaPairDStream[T, jl.Long] = {
JavaPairDStream.scalaToJavaLong(dstream.countByValue())
}

Expand All @@ -91,7 +91,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* partitions.
* @param numPartitions number of partitions of each RDD in the new DStream.
*/
def countByValue(numPartitions: Int): JavaPairDStream[T, JLong] = {
def countByValue(numPartitions: Int): JavaPairDStream[T, jl.Long] = {
JavaPairDStream.scalaToJavaLong(dstream.countByValue(numPartitions))
}

Expand All @@ -101,7 +101,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in
* the window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = {
def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[jl.Long] = {
dstream.countByWindow(windowDuration, slideDuration)
}

Expand All @@ -116,7 +116,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* DStream's batching interval
*/
def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration)
: JavaPairDStream[T, JLong] = {
: JavaPairDStream[T, jl.Long] = {
JavaPairDStream.scalaToJavaLong(
dstream.countByValueAndWindow(windowDuration, slideDuration))
}
Expand All @@ -133,7 +133,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* @param numPartitions number of partitions of each RDD in the new DStream.
*/
def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
: JavaPairDStream[T, JLong] = {
: JavaPairDStream[T, jl.Long] = {
JavaPairDStream.scalaToJavaLong(
dstream.countByValueAndWindow(windowDuration, slideDuration, numPartitions))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.streaming.api.java

import java.lang.{Iterable => JIterable, Long => JLong}
import java.{lang => jl}
import java.lang.{Iterable => JIterable}
import java.util.{List => JList}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -847,7 +848,7 @@ object JavaPairDStream {
}

def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long])
: JavaPairDStream[K, JLong] = {
DStream.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
: JavaPairDStream[K, jl.Long] = {
DStream.toPairDStreamFunctions(dstream.dstream).mapValues(jl.Long.valueOf)
}
}

0 comments on commit 293b5e4

Please sign in to comment.