From 4bda5f4f3019fac83ada83b2c3d9dbd8c66b4628 Mon Sep 17 00:00:00 2001 From: Shingo Omura Date: Thu, 29 May 2014 18:17:17 -0700 Subject: [PATCH 1/2] Introduced 'rxjava-scalaz' project. This project provides some type class instances for Observable. * Monoid: Observable obviously forms a monoid in terms of concatenation. * Functor, Applicative, Monad, MonadPlus: Observable can be a Stream-like Monad and can be MonadPlus as well as Monoid. * Traverse, Foldable: Observable can be Stream-like traversable. __NOTE: The operations for the instance is blocking calls.__ * etc. About Testing, property based tests are applied by Scalaz's ScalaCheck binding. For QuickStart, please refer to rx.java.scala.scalaz.examples.RxScalazDemo. --- rxjava-contrib/rxjava-scalaz/README.md | 73 ++++++++++++++ rxjava-contrib/rxjava-scalaz/build.gradle | 55 +++++++++++ .../scala/rx/lang/scala/scalaz/package.scala | 81 ++++++++++++++++ .../lang/scala/scalaz/ImplicitsForTest.scala | 45 +++++++++ .../scala/scalaz/ObservableEqualSpec.scala | 42 ++++++++ .../scala/scalaz/ObservableIsEmptySpec.scala | 37 +++++++ .../scala/scalaz/ObservableMonadSpec.scala | 40 ++++++++ .../scala/scalaz/ObservableMonoidSpec.scala | 37 +++++++ .../scala/scalaz/ObservableTraverseSpec.scala | 37 +++++++ .../lang/scala/scalaz/ObservableZipSpec.scala | 39 ++++++++ .../scala/scalaz/examples/RxScalazDemo.scala | 97 +++++++++++++++++++ settings.gradle | 3 +- 12 files changed, 585 insertions(+), 1 deletion(-) create mode 100644 rxjava-contrib/rxjava-scalaz/README.md create mode 100644 rxjava-contrib/rxjava-scalaz/build.gradle create mode 100644 rxjava-contrib/rxjava-scalaz/src/main/scala/rx/lang/scala/scalaz/package.scala create mode 100644 rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ImplicitsForTest.scala create mode 100644 rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableEqualSpec.scala create mode 100644 rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableIsEmptySpec.scala create mode 100644 rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableMonadSpec.scala create mode 100644 rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableMonoidSpec.scala create mode 100644 rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableTraverseSpec.scala create mode 100644 rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableZipSpec.scala create mode 100644 rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/examples/RxScalazDemo.scala diff --git a/rxjava-contrib/rxjava-scalaz/README.md b/rxjava-contrib/rxjava-scalaz/README.md new file mode 100644 index 0000000000..6fe870eb33 --- /dev/null +++ b/rxjava-contrib/rxjava-scalaz/README.md @@ -0,0 +1,73 @@ +# rxjava-scalaz +This provides some useful type class instances for `Observable`. Therefore you can apply scalaz's fancy operators to `Observable`. + +Provided type class instances are `Monoid`, `Monad`, `MonadPlus`, `Traverse`, `Foldable`, etc. + +For QuickStart, please refer to [RxScalazDemo](./src/test/scala/rx/lang/scala/scalaz/examples/RxScalazDemo.scala). + +## How to use + +```scala +import scalaz._, Scalaz._ +import rx.lang.scala.Observable +import rx.lang.scala.scalaz._ + +Observable.items(1, 2) |+| Observable.items(3, 4) // == Observable.items(1 2 3 4) +Observable.items(1, 2) ∘ {_ + 1} // == Observable.items(2, 3) +(Observable.items(1, 2) |@| Observable.items(3, 4)) {_ + _} // == Observable.items(4, 5, 5, 6) +1.η[Observable] // == Observable.items(1) +(Observable.items(3) >>= {(i: Int) => Observable.items(i + 1)}) // Observable.items(4) +``` + +Some other useful operators are available. Please see below for details. + +## Provided Typeclass Instances +### Monoid +`Observable` obviously forms a monoid interms of [`concat`](https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#concat). + +```scala +(Observable.items(1, 2) |+| Observable.items(3, 4)) === Observable.items(1, 2, 3, 4) +(Observable.items(1, 2) ⊹ Observable.items(3, 4)) === Observable.items(1, 2, 3, 4) +mzero[Observable[Int]] === Observable.empty +``` + +### Monad, MonadPlus +Essentially, `Observable` is similar to `Stream`. So, `Observable` can be a Stream-like `Monad` and can be a `MonadPlus` as well as `Monoid`. Of course, `Observable` can be also `Functor` and `Applicative`. + +```scala +// Functor operators +(Observable.items(1, 2) ∘ {_ + 1}) === Observable.items(2, 3) +(Observable.items(1, 2) >| 5) === Observable.items(5, 5) +Observable.items(1, 2).fpair === Observable.items((1, 1), (2, 2)) +Observable.items(1, 2).fproduct {_ + 1} === Observable.items((1, 2), (2, 3)) +Observable.items(1, 2).strengthL("x") === Observable.items(("x", 1), ("x", 2)) +Observable.items(1, 2).strengthR("x") === Observable.items((1, "x"), (2, "x")) +Functor[Observable].lift {(_: Int) + 1}(Observable.items(1, 2)) === Observable.items(2, 3) + +// Applicative operators +1.point[Observable] === Observable.items(1) +1.η[Observable] === Observable.items(1) +(Observable.items(1, 2) |@| Observable.items(3, 4)) {_ + _} === Observable.items(4, 5, 5, 6) +(Observable.items(1) <*> {(_: Int) + 1}.η[Observable]) === Observable.items(2) +Observable.items(1) <* Observable.items(2) === Observable.items(1) +Observable.items(1) *> Observable.items(2) === Observable.items(2) + +// Monad and MonadPlus operators +(Observable.items(3) >>= {(i: Int) => Observable.items(i + 1)}) === Observable.items(4) +Observable.items(3) >> Observable.items(2) === Observable.items(2) +Observable.items(Observable.items(1, 2), Observable.items(3, 4)).μ === Observable.items(1, 2, 3, 4) +Observable.items(1, 2) <+> Observable.items(3, 4) === Observable.items(1, 2, 3, 4) +``` + +### Traverse and Foldable +`Observable` can be `Traverse` and `Foldable` as well as `Stream`. This means you can fold `Observable` instance to single value. + +```scala +Observable.items(1, 2, 3).foldMap {_.toString} === "123" +Observable.items(1, 2, 3).foldLeftM(0)((acc, v) => (acc * v).some) === 6.some +Observable.items(1, 2, 3).suml === 6 +Observable.items(1, 2, 3).∀(_ > 3) === true +Observable.items(1, 2, 3).∃(_ > 3) === false +Observable.items(1, 2, 3).traverse(x => (x + 1).some) === Observable.items(2, 3, 4).some +Observable.items(1.some, 2.some).sequence === Observable.items(1, 2).some +``` diff --git a/rxjava-contrib/rxjava-scalaz/build.gradle b/rxjava-contrib/rxjava-scalaz/build.gradle new file mode 100644 index 0000000000..96a3d59a88 --- /dev/null +++ b/rxjava-contrib/rxjava-scalaz/build.gradle @@ -0,0 +1,55 @@ +apply plugin: 'scala' +apply plugin: 'osgi' + +tasks.withType(ScalaCompile) { + scalaCompileOptions.fork = true + scalaCompileOptions.unchecked = true + scalaCompileOptions.setAdditionalParameters(['-feature']) + + configure(scalaCompileOptions.forkOptions) { + memoryMaximumSize = '1g' + jvmArgs = ['-XX:MaxPermSize=512m'] + } +} + +sourceSets { + main { + scala { + srcDir 'src/main/scala' + } + } + test { + scala { + srcDir 'src/test/scala' + } + } +} + +dependencies { + compile project(':rxjava-core') + compile project(':language-adaptors:rxjava-scala') + compile 'org.scalaz:scalaz-core_2.10:7.0.4' + + testCompile 'org.scalaz:scalaz-scalacheck-binding_2.10:7.0.4' + testCompile 'org.typelevel:scalaz-specs2_2.10:0.1.2' + testCompile 'junit:junit-dep:4.10' +} + +tasks.compileScala { + classpath = classpath + (configurations.compile + configurations.provided) +} + +task repl(type: Exec) { + commandLine 'scala', '-cp', sourceSets.test.runtimeClasspath.asPath + standardInput = System.in +} + +jar { + manifest { + name = 'rxjava-scalaz' + instruction 'Bundle-Vendor', 'Netflix' + instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava' + instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,!org.scalatest.*,*' + instruction 'Fragment-Host', 'com.netflix.rxjava.core' + } +} diff --git a/rxjava-contrib/rxjava-scalaz/src/main/scala/rx/lang/scala/scalaz/package.scala b/rxjava-contrib/rxjava-scalaz/src/main/scala/rx/lang/scala/scalaz/package.scala new file mode 100644 index 0000000000..85667b6ddd --- /dev/null +++ b/rxjava-contrib/rxjava-scalaz/src/main/scala/rx/lang/scala/scalaz/package.scala @@ -0,0 +1,81 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala + +import scala.language.higherKinds +import _root_.scalaz._ +import _root_.scalaz.Tags.{Zip => TZip} +import scala.concurrent._ +import scala.concurrent.duration._ + +/** + * This package object provides some type class instances for Observable. + */ +package object scalaz { + + // Monoid + implicit def observableMonoid[A] = new Monoid[Observable[A]] { + override def zero: Observable[A] = Observable.empty + override def append(f1: Observable[A], f2: => Observable[A]): Observable[A] = f1 ++ f2 + } + + implicit val observableInstances = new MonadPlus[Observable] with Zip[Observable] + with IsEmpty[Observable] with Traverse[Observable] { + + // Monad + override def point[A](a: => A) = Observable.items(a) + override def bind[A, B](oa: Observable[A])(f: (A) => Observable[B]) = oa.flatMap(f) + + // MonadPlus + override def empty[A]: Observable[A] = observableMonoid[A].zero + override def plus[A](a: Observable[A], b: => Observable[A]): Observable[A] = observableMonoid[A].append(a, b) + + // Zip + override def zip[A, B](a: => Observable[A], b: => Observable[B]): Observable[(A, B)] = a zip b + + // IsEmpty (NOTE: This method is blocking call) + override def isEmpty[A](fa: Observable[A]): Boolean = getOne(fa.isEmpty) + + // Traverse (NOTE: This method is blocking call) + override def traverseImpl[G[_], A, B](fa: Observable[A])(f: (A) => G[B])(implicit G: Applicative[G]): G[Observable[B]] = { + val seed: G[Observable[B]] = G.point(Observable.empty) + getOne(fa.foldLeft(seed) { + (ys, x) => G.apply2(ys, f(x))((bs, b) => bs :+ b) + }.head) + } + + // This method extracts first elements from Observable. + // NOTE: Make sure that 'o' has at least one element. + private[this] def getOne[T](o: Observable[T]): T = { + val p = Promise[T] + val sub = o.first.subscribe(p.success(_)) + try { + Await.result(p.future, Duration.Inf) + } finally { + sub.unsubscribe() + } + } + } + + // Observable can be ZipList like applicative functor. + // However, due to https://github.com/scalaz/scalaz/issues/338, + // This instance doesn't have 'implicit' modifier. + val observableZipApplicative: Applicative[({ type λ[α] = Observable[α] @@ TZip })#λ] = new Applicative[({ type λ[α] = Observable[α] @@ TZip })#λ] { + def point[A](a: => A) = TZip(Observable.items(a).repeat) + def ap[A, B](oa: => Observable[A] @@ TZip)(of: => Observable[A => B] @@ TZip) = TZip(of.zip(oa) map {fa => fa._1(fa._2)}) + } + +} diff --git a/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ImplicitsForTest.scala b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ImplicitsForTest.scala new file mode 100644 index 0000000000..030285772a --- /dev/null +++ b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ImplicitsForTest.scala @@ -0,0 +1,45 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.scalaz + +import scalaz._ +import rx.lang.scala.Observable +import org.scalacheck.Arbitrary +import scala.concurrent.{Await, Promise} +import scala.concurrent.duration.Duration + +/** + * This object provides implicits for tests. + */ +object ImplicitsForTest { + + // Equality based on sequenceEqual() method. + implicit def observableEqual[A](implicit eqA: Equal[A]) = new Equal[Observable[A]]{ + def equal(a1: Observable[A], a2: Observable[A]) = { + val p = Promise[Boolean] + val sub = a1.sequenceEqual(a2).firstOrElse(false).subscribe(v => p.success(v)) + try { + Await.result(p.future, Duration.Inf) + } finally { + sub.unsubscribe() + } + } + } + + implicit def observableArbitrary[A](implicit a: Arbitrary[A], array: Arbitrary[Array[A]]): Arbitrary[Observable[A]] + = Arbitrary(for (arr <- array.arbitrary) yield Observable.items(arr:_*)) + +} diff --git a/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableEqualSpec.scala b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableEqualSpec.scala new file mode 100644 index 0000000000..8bd05a8dc7 --- /dev/null +++ b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableEqualSpec.scala @@ -0,0 +1,42 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.scalaz + +import scalaz._ +import Scalaz._ +import scalaz.scalacheck.ScalazProperties._ +import rx.lang.scala.Observable +import org.specs2.scalaz.Spec +import org.specs2.runner.JUnitRunner +import org.junit.runner.RunWith + +/** + * Even though Equal[Observable[A]] instance is only for tests. + * However, we should test Equal[Observable[A]] instance, + * Because the result of whole test is based on this instance. + */ +@RunWith(classOf[JUnitRunner]) +class ObservableEqualSpec extends Spec{ + + import rx.lang.scala.scalaz._ + import ImplicitsForTest._ + + "Observable" should { + "satisfies equal laws" in { + checkAll(equal.laws[Observable[Int]]) + } + } +} diff --git a/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableIsEmptySpec.scala b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableIsEmptySpec.scala new file mode 100644 index 0000000000..622d754cb4 --- /dev/null +++ b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableIsEmptySpec.scala @@ -0,0 +1,37 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.scalaz + +import scalaz._ +import Scalaz._ +import scalaz.scalacheck.ScalazProperties._ +import rx.lang.scala.Observable +import org.specs2.scalaz.Spec +import org.specs2.runner.JUnitRunner +import org.junit.runner.RunWith + +@RunWith(classOf[JUnitRunner]) +class ObservableIsEmptySpec extends Spec{ + + import rx.lang.scala.scalaz._ + import ImplicitsForTest._ + + "Observable" should { + "satisfies isEmpty laws" in { + checkAll(isEmpty.laws[Observable]) + } + } +} diff --git a/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableMonadSpec.scala b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableMonadSpec.scala new file mode 100644 index 0000000000..775267fdb1 --- /dev/null +++ b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableMonadSpec.scala @@ -0,0 +1,40 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.scalaz + +import scalaz._ +import Scalaz._ +import scalaz.scalacheck.ScalazProperties._ +import rx.lang.scala.Observable +import org.specs2.scalaz.Spec +import org.specs2.runner.JUnitRunner +import org.junit.runner.RunWith + +@RunWith(classOf[JUnitRunner]) +class ObservableMonadSpec extends Spec { + + import rx.lang.scala.scalaz._ + import ImplicitsForTest._ + + "Observable" should { + "satisfies monad laws" in { + checkAll(monad.laws[Observable]) + } + "satisfies monadplus laws" in { + checkAll(monadPlus.strongLaws[Observable]) + } + } +} diff --git a/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableMonoidSpec.scala b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableMonoidSpec.scala new file mode 100644 index 0000000000..52e13d5622 --- /dev/null +++ b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableMonoidSpec.scala @@ -0,0 +1,37 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.scalaz + +import scalaz._ +import Scalaz._ +import scalaz.scalacheck.ScalazProperties._ +import rx.lang.scala.Observable +import org.specs2.scalaz.Spec +import org.specs2.runner.JUnitRunner +import org.junit.runner.RunWith + +@RunWith(classOf[JUnitRunner]) +class ObservableMonoidSpec extends Spec { + + import rx.lang.scala.scalaz._ + import ImplicitsForTest._ + + "Observable" should { + "satisfies monoid laws" in { + checkAll(monoid.laws[Observable[Int]]) + } + } +} \ No newline at end of file diff --git a/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableTraverseSpec.scala b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableTraverseSpec.scala new file mode 100644 index 0000000000..9e67db4882 --- /dev/null +++ b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableTraverseSpec.scala @@ -0,0 +1,37 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.scalaz + +import scalaz._ +import Scalaz._ +import scalaz.scalacheck.ScalazProperties._ +import rx.lang.scala.Observable +import org.specs2.scalaz.Spec +import org.specs2.runner.JUnitRunner +import org.junit.runner.RunWith + +@RunWith(classOf[JUnitRunner]) +class ObservableTraverseSpec extends Spec { + + import rx.lang.scala.scalaz._ + import ImplicitsForTest._ + + "Observable" should { + "satisfies traverse laws" in { + checkAll(traverse.laws[Observable]) + } + } +} \ No newline at end of file diff --git a/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableZipSpec.scala b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableZipSpec.scala new file mode 100644 index 0000000000..139d2d2473 --- /dev/null +++ b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/ObservableZipSpec.scala @@ -0,0 +1,39 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.scalaz + +import scalaz._ +import Scalaz._ +import rx.lang.scala.Observable +import org.specs2.scalaz.Spec +import org.specs2.runner.JUnitRunner +import org.junit.runner.RunWith + +@RunWith(classOf[JUnitRunner]) +class ObservableZipSpec extends Spec { + + import rx.lang.scala.scalaz._ + import ImplicitsForTest._ + import org.scalacheck.Prop._ + + "Zip Operators" should { + "be able to appy to Observable" in { + forAll { (ob:Observable[Int], f: Int => Int) => + (ob <*|*> (_ map f)) === (ob zip (ob map f)) + } + } + } +} diff --git a/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/examples/RxScalazDemo.scala b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/examples/RxScalazDemo.scala new file mode 100644 index 0000000000..33d69f160b --- /dev/null +++ b/rxjava-contrib/rxjava-scalaz/src/test/scala/rx/lang/scala/scalaz/examples/RxScalazDemo.scala @@ -0,0 +1,97 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.scalaz.examples + +import org.specs2.scalaz.Spec +import org.junit.runner.RunWith +import org.specs2.runner.JUnitRunner +import rx.lang.scala.Observable +import rx.lang.scala.Observable.items +import scalaz._ +import Scalaz._ + +/** + * This demonstrates how you can apply Scalaz's operators to Observables. + */ +@RunWith(classOf[JUnitRunner]) +class RxScalazDemo extends Spec { + + import rx.lang.scala.scalaz._ + import ImplicitsForTest._ + + "RxScalazDemo" should { + "Monoid Operators" >> { + "can apply to Observables" in { + (items(1, 2) |+| items(3, 4)) === items(1, 2, 3, 4) + (items(1, 2) ⊹ items(3, 4)) === items(1, 2, 3, 4) + mzero[Observable[Int]] === Observable.empty + } + } + + "Functor Operators" >> { + "can apply to Observables" in { + (items(1, 2) ∘ {_ + 1}) === items(2, 3) + (items(1, 2) >| 5) === items(5, 5) + (items(1, 2) as 4) === items(4, 4) + + items(1, 2).fpair === items((1, 1), (2, 2)) + items(1, 2).fproduct {_ + 1} === items((1, 2), (2, 3)) + items(1, 2).strengthL("x") === items(("x", 1), ("x", 2)) + items(1, 2).strengthR("x") === items((1, "x"), (2, "x")) + Functor[Observable].lift {(_: Int) + 1}(items(1, 2)) === items(2, 3) + } + } + + "Applicative Operators" >> { + "can apply to Observables" in { + 1.point[Observable] === items(1) + 1.η[Observable] === items(1) + (items(1, 2) |@| items(3, 4)) {_ + _} === items(4, 5, 5, 6) + + (items(1) <*> {(_: Int) + 1}.η[Observable]) === items(2) + items(1) <*> {items(2) <*> {(_: Int) + (_: Int)}.curried.η[Observable]} === items(3) + items(1) <* items(2) === items(1) + items(1) *> items(2) === items(2) + + Apply[Observable].ap(items(2)) {{(_: Int) + 3}.η[Observable]} === items(5) + Apply[Observable].lift2 {(_: Int) * (_: Int)}(items(1, 2), items(3, 4)) === items(3, 4, 6, 8) + } + } + + "Monad and MonadPlus Opeartors" >> { + "can apply to Observables" in { + (items(3) >>= {(i: Int) => items(i + 1)}) === items(4) + items(3) >> items(2) === items(2) + items(items(1, 2), items(3, 4)).μ === items(1, 2, 3, 4) + items(1, 2) <+> items(3, 4) === items(1, 2, 3, 4) + + PlusEmpty[Observable].empty[Int] === Observable.empty + } + } + + "Traverse and Foldable Opearators" >> { + "can apply to Observables" in { + items(1, 2, 3).foldMap {_.toString} === "123" + items(1, 2, 3).foldLeftM(0)((acc, v) => (acc * v).some) === 6.some + items(1, 2, 3).suml === 6 + items(1, 2, 3).∀(_ > 3) === true + items(1, 2, 3).∃(_ > 3) === false + items(1, 2, 3).traverse(x => (x + 1).some) === items(2, 3, 4).some + items(1.some, 2.some).sequence === items(1, 2).some + } + } + } +} diff --git a/settings.gradle b/settings.gradle index 35d90374f8..a4421cdfb7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -16,4 +16,5 @@ include 'rxjava-core', \ 'rxjava-contrib:rxjava-debug', \ 'rxjava-contrib:rxjava-async-util', \ 'rxjava-contrib:rxjava-computation-expressions',\ -'rxjava-contrib:rxjava-quasar' +'rxjava-contrib:rxjava-quasar',\ +'rxjava-contrib:rxjava-scalaz' From 6cf9bf9580389e36d07d4e1b95f87a21ac22c0ec Mon Sep 17 00:00:00 2001 From: Shingo Omura Date: Sat, 31 May 2014 11:04:16 -0700 Subject: [PATCH 2/2] replaced getOne method which is based on promise with toBlocking.first --- .../scala/rx/lang/scala/scalaz/package.scala | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/rxjava-contrib/rxjava-scalaz/src/main/scala/rx/lang/scala/scalaz/package.scala b/rxjava-contrib/rxjava-scalaz/src/main/scala/rx/lang/scala/scalaz/package.scala index 85667b6ddd..774fd84a0a 100644 --- a/rxjava-contrib/rxjava-scalaz/src/main/scala/rx/lang/scala/scalaz/package.scala +++ b/rxjava-contrib/rxjava-scalaz/src/main/scala/rx/lang/scala/scalaz/package.scala @@ -18,8 +18,6 @@ package rx.lang.scala import scala.language.higherKinds import _root_.scalaz._ import _root_.scalaz.Tags.{Zip => TZip} -import scala.concurrent._ -import scala.concurrent.duration._ /** * This package object provides some type class instances for Observable. @@ -47,26 +45,14 @@ package object scalaz { override def zip[A, B](a: => Observable[A], b: => Observable[B]): Observable[(A, B)] = a zip b // IsEmpty (NOTE: This method is blocking call) - override def isEmpty[A](fa: Observable[A]): Boolean = getOne(fa.isEmpty) + override def isEmpty[A](fa: Observable[A]): Boolean = fa.isEmpty.toBlocking.first // Traverse (NOTE: This method is blocking call) override def traverseImpl[G[_], A, B](fa: Observable[A])(f: (A) => G[B])(implicit G: Applicative[G]): G[Observable[B]] = { val seed: G[Observable[B]] = G.point(Observable.empty) - getOne(fa.foldLeft(seed) { + fa.foldLeft(seed) { (ys, x) => G.apply2(ys, f(x))((bs, b) => bs :+ b) - }.head) - } - - // This method extracts first elements from Observable. - // NOTE: Make sure that 'o' has at least one element. - private[this] def getOne[T](o: Observable[T]): T = { - val p = Promise[T] - val sub = o.first.subscribe(p.success(_)) - try { - Await.result(p.future, Duration.Inf) - } finally { - sub.unsubscribe() - } + }.toBlocking.first } }