Skip to content

Commit

Permalink
Added SparkSession API to SuccinctKVRDD
Browse files Browse the repository at this point in the history
  • Loading branch information
anuragkh committed Oct 17, 2016
1 parent 71b572b commit beeaff2
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
Expand Up @@ -9,6 +9,7 @@ import edu.berkeley.cs.succinct.util.SuccinctConstants
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.succinct.kv.SuccinctKVPartition import org.apache.spark.succinct.kv.SuccinctKVPartition
import org.apache.spark.{Dependency, Partition, SparkContext, TaskContext} import org.apache.spark.{Dependency, Partition, SparkContext, TaskContext}
Expand Down Expand Up @@ -241,8 +242,7 @@ object SuccinctKVRDD {
* @return The SuccinctKVRDD. * @return The SuccinctKVRDD.
*/ */
def apply[K: ClassTag](inputRDD: RDD[(K, Array[Byte])]) def apply[K: ClassTag](inputRDD: RDD[(K, Array[Byte])])
(implicit ordering: Ordering[K]) (implicit ordering: Ordering[K]): SuccinctKVRDD[K] = {
: SuccinctKVRDD[K] = {
val partitionsRDD = inputRDD.sortByKey().mapPartitions(createSuccinctKVPartition[K]).cache() val partitionsRDD = inputRDD.sortByKey().mapPartitions(createSuccinctKVPartition[K]).cache()
val firstKeys = partitionsRDD.map(_.firstKey).collect() val firstKeys = partitionsRDD.map(_.firstKey).collect()
new SuccinctKVRDDImpl[K](partitionsRDD, firstKeys) new SuccinctKVRDDImpl[K](partitionsRDD, firstKeys)
Expand Down Expand Up @@ -292,8 +292,9 @@ object SuccinctKVRDD {
/** /**
* Reads a SuccinctKVRDD from disk. * Reads a SuccinctKVRDD from disk.
* *
* @param sc The spark context * @param sc The spark context.
* @param location The path to read the SuccinctKVRDD from. * @param location The path to read the SuccinctKVRDD from.
* @param storageLevel Storage level of the SuccinctKVRDD.
* @return The SuccinctKVRDD. * @return The SuccinctKVRDD.
*/ */
def apply[K: ClassTag](sc: SparkContext, location: String, storageLevel: StorageLevel) def apply[K: ClassTag](sc: SparkContext, location: String, storageLevel: StorageLevel)
Expand All @@ -315,4 +316,18 @@ object SuccinctKVRDD {
val firstKeys = succinctPartitions.map(_.firstKey).collect() val firstKeys = succinctPartitions.map(_.firstKey).collect()
new SuccinctKVRDDImpl[K](succinctPartitions, firstKeys) new SuccinctKVRDDImpl[K](succinctPartitions, firstKeys)
} }

/**
* Reads a SuccinctKVRDD from disk.
*
* @param spark The spark session.
* @param location The path to read the SuccinctKVRDD from.
* @param storageLevel Storage level of the SuccinctKVRDD.
* @return The SuccinctKVRDD.
*/
def apply[K: ClassTag](spark: SparkSession, location: String, storageLevel: StorageLevel)
(implicit ordering: Ordering[K])
: SuccinctKVRDD[K] = {
apply(spark.sparkContext, location, storageLevel)
}
} }
Expand Up @@ -2,6 +2,7 @@ package edu.berkeley.cs.succinct


import org.apache.spark.SparkContext import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel


import scala.reflect.ClassTag import scala.reflect.ClassTag
Expand All @@ -14,6 +15,12 @@ package object kv {
: SuccinctKVRDD[K] = SuccinctKVRDD[K](sc, filePath, storageLevel) : SuccinctKVRDD[K] = SuccinctKVRDD[K](sc, filePath, storageLevel)
} }


implicit class SuccinctSession(spark: SparkSession) {
def succinctKV[K: ClassTag](filePath: String, storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
(implicit ordering: Ordering[K])
: SuccinctKVRDD[K] = SuccinctKVRDD[K](spark, filePath, storageLevel)
}

implicit class SuccinctPairedRDD[K: ClassTag](rdd: RDD[(K, Array[Byte])]) implicit class SuccinctPairedRDD[K: ClassTag](rdd: RDD[(K, Array[Byte])])
(implicit ordering: Ordering[K]) { (implicit ordering: Ordering[K]) {
def succinctKV: SuccinctKVRDD[K] = { def succinctKV: SuccinctKVRDD[K] = {
Expand Down

0 comments on commit beeaff2

Please sign in to comment.