Skip to content

Commit

Permalink
Merge 80a5b2f into 17035f9
Browse files Browse the repository at this point in the history
  • Loading branch information
NoamShaish committed Feb 10, 2019
2 parents 17035f9 + 80a5b2f commit 9cd8a0b
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 61 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,12 @@

[coverage_icon]: https://coveralls.io/repos/github/NoamShaish/containers/badge.svg?branch=master
[coverage_link]: https://coveralls.io/github/NoamShaish/containers?branch=master


### Release
```
sbt
> set version := "x.x.x"
> publishSigned
> sonatypeRelease
```
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ lazy val core = (project in file("core"))
)

lazy val spark = (project in file("spark"))
.dependsOn(core)
.dependsOn(core % "compile->compile;test->test")
.settings(
name := "containers-spark",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.scalatest" %% "scalatest" % "3.0.5" % "test"
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"com.holdenkarau" %% "spark-testing-base" % s"${sparkVersion}_0.10.0" % "test"
)
)

Expand Down
68 changes: 68 additions & 0 deletions core/src/main/scala/containers/Container.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,32 @@ trait Container[C[_]] {
def count[A](c: C[A])(p: A => Boolean = (_: A) => true): Long

def reduce[A](c: C[A])(f: (A, A) => A): A

def distinct[A](c: C[A]): C[A]

def union[A](c: C[A], other: C[A]): C[A]

def sortBy[A, K : Ordering : ClassTag](c: C[A], f: A => K, ascending: Boolean = true): C[A]

def intersection[A](c: C[A], other: C[A]): C[A]

def cartesian[A, B: ClassTag](c: C[A], other: C[B]): C[(A, B)]

def groupBy[A, K: ClassTag](c: C[A], f: A => K): C[(K, Iterable[A])]

def zip[A, B: ClassTag](c: C[A], other: C[B]): C[(A, B)]

def foreach[A](c: C[A], f: A => Unit): Unit
}

@implicitNotFound("No member of type class PairContainer in scope for ${C}")
trait PairContainer[C[_]] extends Container[C] {
def reduceByKey[K: ClassTag, V: ClassTag](c: C[(K, V)])(f: (V, V) => V): C[(K, V)]

def combineByKey[K: ClassTag, V: ClassTag, B](c: C[(K, V)])(
createCombiner: V => B,
mergeValue: (B, V) => B,
mergeCombiners: (B, B) => B): C[(K, B)]
}

object Container {
Expand All @@ -45,10 +66,29 @@ object Container {
def count(p: A => Boolean = (_: A) => true): Long = Container[C].count(c)(p)

def reduce(f: (A, A) => A): A = Container[C].reduce(c)(f)

def distinct: C[A] = Container[C].distinct(c)

def union(other: C[A]): C[A] = Container[C].union(c, other)

def sortBy[K : Ordering : ClassTag](f: A => K, ascending: Boolean = true): C[A] = Container[C].sortBy(c, f, ascending)

def intersection(other: C[A]): C[A] = Container[C].intersection(c, other)

def cartesian[B: ClassTag](other: C[B]) = Container[C].cartesian(c, other)

def groupBy[K: ClassTag](f: A => K): C[(K, Iterable[A])] = Container[C].groupBy(c, f)

def zip[B: ClassTag](other: C[B]): C[(A, B)] = Container[C].zip(c, other)

def foreach(f: A => Unit): Unit = Container[C].foreach(c, f)
}

implicit class PairContainerOps[K: ClassTag, V: ClassTag, C[_]: PairContainer](c: C[(K, V)]) {
def reduceByKey(f: (V, V) => V): C[(K, V)] = Container[K, V, C].reduceByKey(c)(f)

def combineByKey[B](createCombiner: V => B, mergeValue: (B, V) => B, mergeCombiners: (B, B) => B): C[(K, B)] =
Container[K, V, C].combineByKey(c)(createCombiner, mergeValue, mergeCombiners)
}
}

Expand All @@ -64,11 +104,39 @@ object Container {
override def count[A](c: List[A])(p: (A) => Boolean = (_: A) => true): Long = c.count(p).toLong

override def reduce[A](c: List[A])(f: (A, A) => A): A = c.reduce(f)

override def distinct[A](c: List[A]): List[A] = c.distinct

override def union[A](c: List[A], other: List[A]): List[A] = c.union(other)

override def sortBy[A, K: Ordering : ClassTag](c: List[A], f: A => K, ascending: Boolean): List[A] = c.sortBy(f)(
if (!ascending) Ordering[K].reverse
else Ordering[K]
)

override def intersection[A](c: List[A], other: List[A]): List[A] = c.intersect(other)

override def cartesian[A, B: ClassTag](c: List[A], other: List[B]): List[(A, B)] = for {
l <- c
r <- other
} yield (l, r)

override def groupBy[A, K: ClassTag](c: List[A], f: A => K): List[(K, Iterable[A])] = c.groupBy(f).toList

override def zip[A, B: ClassTag](c: List[A], other: List[B]): List[(A, B)] = c.zip(other)

override def foreach[A](c: List[A], f: A => Unit): Unit = c.foreach(f)
}

private sealed class PairListContainer extends ListContainer with PairContainer[List] {
override def reduceByKey[K: ClassTag, V: ClassTag](c: List[(K, V)])(f: (V, V) => V): List[(K, V)] =
c.groupBy(_._1).map{ case (k, vl) => (k, vl.map(_._2).reduce(f))}.toList

override def combineByKey[K: ClassTag, V: ClassTag, B](c: List[(K, V)])(
createCombiner: V => B,
mergeValue: (B, V) => B,
mergeCombiners: (B, B) => B): List[(K, B)] =
c.groupBy(_._1).map{ case (k, vl) => (k, vl.map(v => createCombiner(v._2)).reduce(mergeCombiners))}.toList
}

implicit val listContainer: Container[List] = new ListContainer
Expand Down

0 comments on commit 9cd8a0b

Please sign in to comment.