Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observables packaging #1

Merged
merged 4 commits into from Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -10,7 +10,7 @@ subprojects {
apply plugin: 'maven'

group = "com.github.otah"
version = "0.1-SNAPSHOT"
version = "0.2-SNAPSHOT"

sourceCompatibility = 1.8

Expand Down
Expand Up @@ -2,7 +2,7 @@ package com.github.otah.hap.examples

import com.github.otah.hap.api.characteristics.PowerStateCharacteristic
import com.github.otah.hap.api.services.SwitchService
import com.github.otah.hap.observable.ObservableWritableCharacteristic
import com.github.otah.hap.monix.ObservableWritableCharacteristic
import monix.reactive.subjects.BehaviorSubject

case class ExampleSwitch(id: Int, label: String)
Expand Down
@@ -1,4 +1,4 @@
package com.github.otah.hap.observable
package com.github.otah.hap.monix

import com.github.otah.hap.api.Characteristic
import monix.eval.Task
Expand All @@ -11,7 +11,7 @@ abstract class ObservableCharacteristic[T](observable: Observable[T], currentVal

def this(behavior: BehaviorSubject[T])(implicit scheduler: Scheduler) = this(behavior, behavior.headL)

override val reader = Reader(currentValue.runAsync)
override val reader = ObservableReader(currentValue)

override val notifier = Some(ObservableNotifier[T](observable))
override val notifier = ObservableNotifier(observable)
}
@@ -1,12 +1,12 @@
package com.github.otah.hap.observable
package com.github.otah.hap.monix

import com.github.otah.hap.api.{Subscription, TypedNotifier}
import monix.execution.{Ack, Scheduler}
import monix.reactive.Observable

import scala.concurrent.Future

case class ObservableNotifier[T](observable: Observable[T])(implicit scheduler: Scheduler) extends TypedNotifier[T] {
class ObservableNotifier[T](observable: Observable[T])(implicit scheduler: Scheduler) extends TypedNotifier[T] {

override def subscribe(callback: T => Future[Unit]) = new Subscription {

Expand All @@ -15,3 +15,7 @@ case class ObservableNotifier[T](observable: Observable[T])(implicit scheduler:
override def unsubscribe(): Unit = subscription.cancel()
}
}

object ObservableNotifier {
def apply[T](observable: Observable[T])(implicit scheduler: Scheduler) = Some(new ObservableNotifier(observable))
}
@@ -0,0 +1,12 @@
package com.github.otah.hap.monix

import monix.eval.Task
import monix.execution.Scheduler

import scala.concurrent.{ExecutionContext, Future}

object ObservableReader {

def apply[T](currentValue: Task[T])(implicit scheduler: Scheduler): Some[ExecutionContext => Future[Option[T]]] =
Some(_ => currentValue.map(Some.apply).runAsync)
}
@@ -1,4 +1,4 @@
package com.github.otah.hap.observable
package com.github.otah.hap.monix

import monix.eval.Task
import monix.execution.Scheduler
Expand Down
@@ -1,4 +1,4 @@
package com.github.otah.hap.observable
package com.github.otah.hap.reactivex

import com.github.otah.hap.api.Characteristic
import rx.lang.scala.Observable
Expand All @@ -9,7 +9,7 @@ abstract class ObservableCharacteristic[T](observable: Observable[T], currentVal

def this(behavior: BehaviorSubject[T]) = this(behavior, behavior.head)

override val reader = Reader(currentValue.toBlocking.toFuture)
override val reader = ObservableReader(currentValue)

override val notifier = Some(ObservableNotifier[T](observable))
override val notifier = ObservableNotifier(observable)
}
@@ -1,11 +1,11 @@
package com.github.otah.hap.observable
package com.github.otah.hap.reactivex

import com.github.otah.hap.api.{Subscription, TypedNotifier}
import rx.lang.scala.Observable

import scala.concurrent.Future

case class ObservableNotifier[T](observable: Observable[T]) extends TypedNotifier[T] {
class ObservableNotifier[T](observable: Observable[T]) extends TypedNotifier[T] {

override def subscribe(callback: T => Future[Unit]) = new Subscription {

Expand All @@ -14,3 +14,7 @@ case class ObservableNotifier[T](observable: Observable[T]) extends TypedNotifie
override def unsubscribe(): Unit = subscription.unsubscribe()
}
}

object ObservableNotifier {
def apply[T](observable: Observable[T]) = Some(new ObservableNotifier(observable))
}
@@ -0,0 +1,11 @@
package com.github.otah.hap.reactivex

import rx.lang.scala.Observable

import scala.concurrent.{ExecutionContext, Future}

object ObservableReader {

def apply[T](currentValue: Observable[T]): Some[ExecutionContext => Future[Option[T]]] =
Some(_ => currentValue.map(Some.apply).toBlocking.toFuture)
}
@@ -1,4 +1,4 @@
package com.github.otah.hap.observable
package com.github.otah.hap.reactivex

import rx.lang.scala.subjects.BehaviorSubject
import rx.lang.scala.{Observable, Observer, Subject}
Expand Down