Permalink
Browse files

added the possibility to persist tuples of DListPersisters

  • Loading branch information...
1 parent 9af68ed commit 0de91cd295ac437cc9e45e2c8b54967dac36968a @etorreborre etorreborre committed Sep 25, 2012
View
59 src/main/scala/com/nicta/scoobi/application/Persister.scala
@@ -38,7 +38,7 @@ trait Persister[In] {
}
/** Persister type class instances for tuples. */
-object Persister {
+object Persister extends LowImplicitsPersister {
lazy val logger = LogFactory.getLog("scoobi.Persister")
/** Evaluate and persist a distributed computation. This can be a combination of one or more
@@ -301,6 +301,63 @@ object Persister {
}
}
+/**
+ * Those additional implicits allow to pass tuples of persisters to persist at once with the persist method
+ * @see PersisterSpec
+ */
+trait LowImplicitsPersister {
+
+ implicit def tuple2persisters[T1, T2] (implicit p1: Persister[T1], p2: Persister[T2]) =
+ new Persister[(T1, T2)] {
+ type Out = (p1.Out, p2.Out)
+ def apply(x: (T1, T2), conf: ScoobiConfiguration) =
+ (p1.apply(x._1, conf), p2.apply(x._2, conf))
+ }
+
+ implicit def tuple3persisters[T1, T2, T3] (implicit p1: Persister[T1], p2: Persister[T2], p3: Persister[T3]) =
+ new Persister[(T1, T2, T3)] {
+ type Out = (p1.Out, p2.Out, p3.Out)
+ def apply(x: (T1, T2, T3), conf: ScoobiConfiguration) =
+ (p1.apply(x._1, conf), p2.apply(x._2, conf), p3.apply(x._3, conf))
+ }
+
+ implicit def tuple4persisters[T1, T2, T3, T4] (implicit p1: Persister[T1], p2: Persister[T2], p3: Persister[T3], p4: Persister[T4]) =
+ new Persister[(T1, T2, T3, T4)] {
+ type Out = (p1.Out, p2.Out, p3.Out, p4.Out)
+ def apply(x: (T1, T2, T3, T4), conf: ScoobiConfiguration) =
+ (p1.apply(x._1, conf), p2.apply(x._2, conf), p3.apply(x._3, conf), p4.apply(x._4, conf))
+ }
+
+ implicit def tuple5persisters[T1, T2, T3, T4, T5] (implicit p1: Persister[T1], p2: Persister[T2], p3: Persister[T3], p4: Persister[T4], p5: Persister[T5]) =
+ new Persister[(T1, T2, T3, T4, T5)] {
+ type Out = (p1.Out, p2.Out, p3.Out, p4.Out, p5.Out)
+ def apply(x: (T1, T2, T3, T4, T5), conf: ScoobiConfiguration) =
+ (p1.apply(x._1, conf), p2.apply(x._2, conf), p3.apply(x._3, conf), p4.apply(x._4, conf), p5.apply(x._5, conf))
+ }
+
+ implicit def tuple6persisters[T1, T2, T3, T4, T5, T6] (implicit p1: Persister[T1], p2: Persister[T2], p3: Persister[T3], p4: Persister[T4], p5: Persister[T5], p6: Persister[T6]) =
+ new Persister[(T1, T2, T3, T4, T5, T6)] {
+ type Out = (p1.Out, p2.Out, p3.Out, p4.Out, p5.Out, p6.Out)
+ def apply(x: (T1, T2, T3, T4, T5, T6), conf: ScoobiConfiguration) =
+ (p1.apply(x._1, conf), p2.apply(x._2, conf), p3.apply(x._3, conf), p4.apply(x._4, conf), p5.apply(x._5, conf), p6.apply(x._6, conf))
+ }
+
+ implicit def tuple7persisters[T1, T2, T3, T4, T5, T6, T7] (implicit p1: Persister[T1], p2: Persister[T2], p3: Persister[T3], p4: Persister[T4], p5: Persister[T5], p6: Persister[T6], p7: Persister[T7]) =
+ new Persister[(T1, T2, T3, T4, T5, T6, T7)] {
+ type Out = (p1.Out, p2.Out, p3.Out, p4.Out, p5.Out, p6.Out, p7.Out)
+ def apply(x: (T1, T2, T3, T4, T5, T6, T7), conf: ScoobiConfiguration) =
+ (p1.apply(x._1, conf), p2.apply(x._2, conf), p3.apply(x._3, conf), p4.apply(x._4, conf), p5.apply(x._5, conf), p6.apply(x._6, conf), p7.apply(x._7, conf))
+ }
+
+ implicit def tuple8persisters[T1, T2, T3, T4, T5, T6, T7, T8] (implicit p1: Persister[T1], p2: Persister[T2], p3: Persister[T3], p4: Persister[T4], p5: Persister[T5], p6: Persister[T6], p7: Persister[T7], p8: Persister[T8]) =
+ new Persister[(T1, T2, T3, T4, T5, T6, T7, T8)] {
+ type Out = (p1.Out, p2.Out, p3.Out, p4.Out, p5.Out, p6.Out, p7.Out, p8.Out)
+ def apply(x: (T1, T2, T3, T4, T5, T6, T7, T8), conf: ScoobiConfiguration) =
+ (p1.apply(x._1, conf), p2.apply(x._2, conf), p3.apply(x._3, conf), p4.apply(x._4, conf), p5.apply(x._5, conf), p6.apply(x._6, conf), p7.apply(x._7, conf), p8.apply(x._8, conf))
+ }
+
+
+}
/** The container for persisting a DList. */
case class DListPersister[A](dlist: DList[A], sink: DataSink[_, _, A]) {
View
7 src/test/scala/com/nicta/scoobi/application/PersisterSpec.scala
@@ -13,6 +13,13 @@ class PersisterSpec extends NictaHadoop with SimpleJobs {
persist(lists)
dirs.map(dirResults).flatten.toSet must_== Set("1","2","3","4","5","6")
}
+ "A tuple containing a sequence of DLists can be persisted to text files" >> { implicit sc: ScoobiConfiguration =>
+ val dirs = Seq.fill(4)(TempFiles.createTempDir("test"))
+ val lists = Seq(DList(1, 2), DList(3, 4), DList(5, 6)).zip(dirs).map { case (l, d) => toTextFile(l, path(d)) }
+ val firstList = toTextFile(DList(7, 8, 9), path(dirs.last))
+ persist((firstList, lists))
+ dirs.map(dirResults).flatten.toSet must_== Set("1","2","3","4","5","6","7","8","9")
+ }
"A sequence of DLists can be persisted simultaneously with the run method" >> { implicit sc: ScoobiConfiguration =>
val lists = Seq(DList(1, 2), DList(3, 4), DList(5, 6))
lists.run.flatten.toSet must_== Set(1,2,3,4,5,6)

0 comments on commit 0de91cd

Please sign in to comment.