diff --git a/reductor-observable-rxjava2/.gitignore b/reductor-observable-rxjava2/.gitignore new file mode 100644 index 0000000..796b96d --- /dev/null +++ b/reductor-observable-rxjava2/.gitignore @@ -0,0 +1 @@ +/build diff --git a/reductor-observable-rxjava2/README.md b/reductor-observable-rxjava2/README.md new file mode 100644 index 0000000..397b074 --- /dev/null +++ b/reductor-observable-rxjava2/README.md @@ -0,0 +1,45 @@ +# Reductor Observable + +Combine RxJava streams to dispatch async actions and handle side effects. + +### Epic + +The core primitive is `Epic`. Epic is defined as a simple interface: + +```java +public interface Epic { + Observable run(Observable actions, Store store); +} +``` + +Epic is run once `Store` is created. +It's basically a function that takes a stream of actions and returns stream of actions. +Each object emitted by returned Observable will be dispatched back to `Store`. + +A simple example of async flow is Ping-Pong Epic which listens for `PING` action and responds with `PONG` after one second. + +```java +Epic pingPongEpic = (actions, store) -> + actions.filter(Epics.ofType("PING")) + .delay(1, TimeUnit.SECONDS) + .map(action -> Action.create("PONG")); +``` + +Each 'PONG' message will be dispatched back to store to be handled by `Reducer`. + +### Creating Middleware + +To connect `Epic` to `Store`, create `EpicMiddleware` with provided epic. +Once middleware is created, it can be passed to `Store.create`: + +```java +EpicMiddleware middleware = EpicMiddleware.create(pingPongEpic); + +Store store = Store.create(reducer, middleware); +``` + +#### Combining epics + +EpicMiddleware takes only one rootEpic in `EpicMiddleware.create`. +However, you can combine more than one epics into single Epic by using `Epics.combineEpics(epics)`. +This will simply merge all returned streams into one. diff --git a/reductor-observable-rxjava2/build.gradle b/reductor-observable-rxjava2/build.gradle new file mode 100644 index 0000000..e79cf64 --- /dev/null +++ b/reductor-observable-rxjava2/build.gradle @@ -0,0 +1,14 @@ +apply plugin: 'java' +apply plugin: 'me.tatarka.retrolambda' +apply from: '../gradle/publishing.gradle' +apply from: '../gradle/jacoco.gradle' + +dependencies { + retrolambdaConfig "net.orfjackal.retrolambda:retrolambda:${retrolambdaVersion}" + + compile project(':lib') + compile 'io.reactivex.rxjava2:rxjava:2.1.0' + + testCompile 'junit:junit:4.12' + testCompile 'org.mockito:mockito-core:1.10.19' +} \ No newline at end of file diff --git a/reductor-observable-rxjava2/gradle.properties b/reductor-observable-rxjava2/gradle.properties new file mode 100644 index 0000000..79632c7 --- /dev/null +++ b/reductor-observable-rxjava2/gradle.properties @@ -0,0 +1,2 @@ +ARTIFACT_ID=reductor-observable-rxjava2 +POM_NAME=ReductorObservableRxJava2 \ No newline at end of file diff --git a/reductor-observable-rxjava2/src/main/java/com/yheriatovych/reductor/observable/rxjava2/Epic.java b/reductor-observable-rxjava2/src/main/java/com/yheriatovych/reductor/observable/rxjava2/Epic.java new file mode 100644 index 0000000..db7c014 --- /dev/null +++ b/reductor-observable-rxjava2/src/main/java/com/yheriatovych/reductor/observable/rxjava2/Epic.java @@ -0,0 +1,32 @@ +package com.yheriatovych.reductor.observable.rxjava2; + +import com.yheriatovych.reductor.Action; +import com.yheriatovych.reductor.Store; +import io.reactivex.Observable; + +/** + * Core primitive to process and dispatch actions asynchronously + * It is a function which takes a stream of actions and returns a stream of actions. + *

+ * Actions emitted by returning stream will be dispatched back to {@link Store} + *

+ * Ping-Pong example: + *


+ * Epic<String> pingPong = (actions, store) ->
+ *         actions.filter(Epics.ofType("PING"))
+ *                 .delay(1, TimeUnit.SECONDS)
+ *                 .map(action -> Action.create("PONG"));
+ * 
+ * + * @param state type of {@link Store} + */ +public interface Epic { + /** + * Functions that will be called by {@link Store} once it's created. + * + * @param actions an Observable of actions dispatched to the Store + * @param store a Store object that Epic can use to query the state + * @return an Observable of actions that will be dispatched back to Store + */ + Observable run(Observable actions, Store store); +} diff --git a/reductor-observable-rxjava2/src/main/java/com/yheriatovych/reductor/observable/rxjava2/EpicMiddleware.java b/reductor-observable-rxjava2/src/main/java/com/yheriatovych/reductor/observable/rxjava2/EpicMiddleware.java new file mode 100644 index 0000000..dae466c --- /dev/null +++ b/reductor-observable-rxjava2/src/main/java/com/yheriatovych/reductor/observable/rxjava2/EpicMiddleware.java @@ -0,0 +1,59 @@ +package com.yheriatovych.reductor.observable.rxjava2; + +import com.yheriatovych.reductor.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.subjects.PublishSubject; + +/** + * Middleware that runs provided {@link Epic} after {@link Store} created + * and dispatches actions produced by Epic back to the Store. + * + * @param state type of {@link Store} + */ +public class EpicMiddleware implements Middleware, Disposable { + + private final Epic epic; + private Disposable disposable; + + private EpicMiddleware(Epic rootEpic) { + this.epic = rootEpic; + } + + /** + * Factory method to create EpicMiddleware. + *

+ * This method takes only one epic. + * However, several epics can be combined with {@link Epics#combineEpics(Iterable)}. + * + * @param rootEpic epic to run once store is created + * @param state type of {@link Store} + * @return instance of EpicMiddleware to be passed to {@link Store#create(Reducer, Middleware[])} + */ + public static EpicMiddleware create(Epic rootEpic) { + return new EpicMiddleware<>(rootEpic); + } + + @Override + public Dispatcher create(Store store, Dispatcher nextDispatcher) { + PublishSubject actions = PublishSubject.create(); + disposable = epic.run(actions, store).subscribe(store::dispatch); + return action -> { + nextDispatcher.dispatch(action); + if(action instanceof Action) { + actions.onNext((Action) action); + } + }; + } + + @Override + public void dispose() { + if (disposable != null) { + disposable.dispose(); + } + } + + @Override + public boolean isDisposed() { + return disposable != null && disposable.isDisposed(); + } +} diff --git a/reductor-observable-rxjava2/src/main/java/com/yheriatovych/reductor/observable/rxjava2/Epics.java b/reductor-observable-rxjava2/src/main/java/com/yheriatovych/reductor/observable/rxjava2/Epics.java new file mode 100644 index 0000000..63ec510 --- /dev/null +++ b/reductor-observable-rxjava2/src/main/java/com/yheriatovych/reductor/observable/rxjava2/Epics.java @@ -0,0 +1,35 @@ +package com.yheriatovych.reductor.observable.rxjava2; + +import com.yheriatovych.reductor.Action; +import io.reactivex.Observable; +import io.reactivex.functions.Predicate; + +public class Epics { + private Epics() { + } + + /** + * Combine several Epics into one. + *

+ * Returned streams will be merged with {@link Observable#merge(Iterable)} + * + * @param epics Epics to combine + * @param state type + * @return Epic that will combine all provided epics behaviour + */ + public static Epic combineEpics(Iterable> epics) { + return (actions, store) -> Observable.fromIterable(epics) + .flatMap(epic -> epic.run(actions, store)); + } + + /** + * Useful predicate to be used with {@link Observable#filter(Predicate)} in Epic implementation + * to filter only actions with specific {@link Action#type} + * + * @param type Action type filtered + * @return Predicate that will check if {@link Action#type} equals to specified type + */ + public static Predicate ofType(String type) { + return action -> type.equals(action.type); + } +} diff --git a/reductor-observable-rxjava2/src/test/java/com/yheriatovych/reductor/observable/rxjava2/EpicMiddlewareTest.java b/reductor-observable-rxjava2/src/test/java/com/yheriatovych/reductor/observable/rxjava2/EpicMiddlewareTest.java new file mode 100644 index 0000000..4545e5f --- /dev/null +++ b/reductor-observable-rxjava2/src/test/java/com/yheriatovych/reductor/observable/rxjava2/EpicMiddlewareTest.java @@ -0,0 +1,81 @@ +package com.yheriatovych.reductor.observable.rxjava2; + +import com.yheriatovych.reductor.Action; +import com.yheriatovych.reductor.Reducer; +import com.yheriatovych.reductor.Store; +import io.reactivex.Observable; +import io.reactivex.observers.TestObserver; +import io.reactivex.subjects.PublishSubject; +import org.junit.Before; +import org.junit.Test; +import org.mockito.*; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + +public class EpicMiddlewareTest { + + private class TestState { + + } + + private class TestReducer implements Reducer { + @Override + public TestState reduce(TestState testState, Action action) { + if(testState == null) { + testState = new TestState(); + } + return testState; + } + } + + @Spy TestReducer reducer = new TestReducer(); + PublishSubject epicObservable = PublishSubject.create(); + @Mock + Epic epic; + @Captor ArgumentCaptor> actionsCaptor; + Store store; + EpicMiddleware epicMiddleware; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + when(epic.run(actionsCaptor.capture(), any())).thenReturn(epicObservable); + epicMiddleware = EpicMiddleware.create(epic); + } + + @Test + public void testSubscribeToEpic() { + store = Store.create(reducer, epicMiddleware); + + verify(epic).run(any(), eq(store)); + assertTrue(epicObservable.hasObservers()); + } + + @Test + public void testPropagateActionsToEpic() { + store = Store.create(reducer, epicMiddleware); + + TestObserver testObserver = TestObserver.create(); + actionsCaptor.getValue().subscribe(testObserver); + + Action testAction = Action.create("TEST"); + store.dispatch(testAction); + + testObserver.assertValue(testAction); + } + + @Test + public void testUnsubscriptionEpic() { + store = Store.create(reducer, epicMiddleware); + + assertTrue("epic observable should has observers after Store.create", epicObservable.hasObservers()); + + assertFalse("Epic should not be unsubscribed", epicMiddleware.isDisposed()); + epicMiddleware.dispose(); + assertTrue("Epic should be unsubscribed", epicMiddleware.isDisposed()); + + assertFalse("epic observable is unsubscibed after middleware.unsubscribe", epicObservable.hasObservers()); + } +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index ff381b0..3b5ea4b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,2 +1,2 @@ -include ':example', ':compiller', ':lib', 'reductor-rxjava', 'reductor-rxjava2', 'reductor-observable' +include ':example', ':compiller', ':lib', 'reductor-rxjava', 'reductor-rxjava2', 'reductor-observable', 'reductor-observable-rxjava2'