-
Notifications
You must be signed in to change notification settings - Fork 0
/
ProvenanceRDD.scala
71 lines (57 loc) · 2.12 KB
/
ProvenanceRDD.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package provenance.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import provenance.data.Provenance
import symbolicprimitives.SymBase
import scala.reflect.ClassTag
import scala.util.Random
/** Trait to ensure consistent base API between Pair and non-Pair */
trait ProvenanceRDD[T] extends Serializable {
def map[U: ClassTag](f: T => U, enableUDFAwareProv: Option[Boolean] = None): ProvenanceRDD[U]
//def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U]): ProvenanceRDD[U]
def flatMap[U:ClassTag](f: T => TraversableOnce[U], enableUDFAwareProv: Option[Boolean] = None): ProvenanceRDD[U]
def filter(f: T => Boolean): ProvenanceRDD[T]
//
// def count(): Long
//
// def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): ProvenanceRDD[T]
//
// def distinct(): ProvenanceRDD[T]
//
// def persist(newLevel: StorageLevel): this.type
//
// def persist(): this.type
//
// def unpersist(blocking: Boolean = true): this.type
//
// def cache(): this.type
//
def collect(): Array[T]
//
def collectWithProvenance(): Array[ProvenanceRow[T]]
def take(num: Int): Array[T]
def takeWithProvenance(num: Int): Array[ProvenanceRow[T]]
//
// def takeSample(withReplacement: Boolean,
// num: Int,
// // should technically use Spark's Utils.random
// seed: Long = new Random().nextLong): Array[T]
//
// def takeSampleWithProvenance(withReplacement: Boolean,
// num: Int,
// // should technically use Spark's Utils.random
// seed: Long = new Random().nextLong): Array[ProvenanceRow[T]]
//
def setName(name: String): this.type
def rdd: RDD[_]
def count(): Long = rdd.count()
}
object ProvenanceRDD {
implicit def toPairRDD[K: ClassTag, V: ClassTag](rdd: ProvenanceRDD[(K,V)]): PairProvenanceDefaultRDD[K,V] = {
rdd match {
case pair: PairProvenanceDefaultRDD[K, V] => pair
case flat: FlatProvenanceDefaultRDD[(K, V)] => FlatProvenanceDefaultRDD.flatToPair(flat)
case _ => throw new NotImplementedError("Unknown RDD type for pair conversion: $rdd")
}
}
}