Skip to content

Commit

Permalink
added reductor observable (for rxjava2) project
Browse files Browse the repository at this point in the history
  • Loading branch information
Yarikx committed May 28, 2017
1 parent ba2f311 commit fd3aa2b
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 1 deletion.
1 change: 1 addition & 0 deletions reductor-observable-rxjava2/.gitignore
@@ -0,0 +1 @@
/build
45 changes: 45 additions & 0 deletions reductor-observable-rxjava2/README.md
@@ -0,0 +1,45 @@
# Reductor Observable

Combine RxJava streams to dispatch async actions and handle side effects.

### Epic

The core primitive is `Epic`. Epic is defined as a simple interface:

```java
public interface Epic<T> {
Observable<Object> run(Observable<Action> actions, Store<T> store);
}
```

Epic is run once `Store` is created.
It's basically a function that takes a stream of actions and returns stream of actions.
Each object emitted by returned Observable will be dispatched back to `Store`.

A simple example of async flow is Ping-Pong Epic which listens for `PING` action and responds with `PONG` after one second.

```java
Epic<String> pingPongEpic = (actions, store) ->
actions.filter(Epics.ofType("PING"))
.delay(1, TimeUnit.SECONDS)
.map(action -> Action.create("PONG"));
```

Each 'PONG' message will be dispatched back to store to be handled by `Reducer`.

### Creating Middleware

To connect `Epic` to `Store`, create `EpicMiddleware` with provided epic.
Once middleware is created, it can be passed to `Store.create`:

```java
EpicMiddleware<String> middleware = EpicMiddleware.create(pingPongEpic);

Store<String> store = Store.create(reducer, middleware);
```

#### Combining epics

EpicMiddleware takes only one rootEpic in `EpicMiddleware.create`.
However, you can combine more than one epics into single Epic by using `Epics.combineEpics(epics)`.
This will simply merge all returned streams into one.
14 changes: 14 additions & 0 deletions reductor-observable-rxjava2/build.gradle
@@ -0,0 +1,14 @@
apply plugin: 'java'
apply plugin: 'me.tatarka.retrolambda'
apply from: '../gradle/publishing.gradle'
apply from: '../gradle/jacoco.gradle'

dependencies {
retrolambdaConfig "net.orfjackal.retrolambda:retrolambda:${retrolambdaVersion}"

compile project(':lib')
compile 'io.reactivex.rxjava2:rxjava:2.1.0'

testCompile 'junit:junit:4.12'
testCompile 'org.mockito:mockito-core:1.10.19'
}
2 changes: 2 additions & 0 deletions reductor-observable-rxjava2/gradle.properties
@@ -0,0 +1,2 @@
ARTIFACT_ID=reductor-observable-rxjava2
POM_NAME=ReductorObservableRxJava2
@@ -0,0 +1,32 @@
package com.yheriatovych.reductor.observable.rxjava2;

import com.yheriatovych.reductor.Action;
import com.yheriatovych.reductor.Store;
import io.reactivex.Observable;

/**
* Core primitive to process and dispatch actions asynchronously
* It is a function which takes a stream of actions and returns a stream of actions.
* <p>
* Actions emitted by returning stream will be dispatched back to {@link Store}
* <p>
* Ping-Pong example:
* <pre><code>
* Epic&lt;String&gt; pingPong = (actions, store) -&gt;
* actions.filter(Epics.ofType("PING"))
* .delay(1, TimeUnit.SECONDS)
* .map(action -&gt; Action.create("PONG"));
* </code></pre>
*
* @param <T> state type of {@link Store}
*/
public interface Epic<T> {
/**
* Functions that will be called by {@link Store} once it's created.
*
* @param actions an Observable of actions dispatched to the Store
* @param store a Store object that Epic can use to query the state
* @return an Observable of actions that will be dispatched back to Store
*/
Observable<Object> run(Observable<Action> actions, Store<T> store);
}
@@ -0,0 +1,59 @@
package com.yheriatovych.reductor.observable.rxjava2;

import com.yheriatovych.reductor.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.PublishSubject;

/**
* Middleware that runs provided {@link Epic} after {@link Store} created
* and dispatches actions produced by Epic back to the Store.
*
* @param <T> state type of {@link Store}
*/
public class EpicMiddleware<T> implements Middleware<T>, Disposable {

private final Epic<T> epic;
private Disposable disposable;

private EpicMiddleware(Epic<T> rootEpic) {
this.epic = rootEpic;
}

/**
* Factory method to create EpicMiddleware.
* <p>
* This method takes only one epic.
* However, several epics can be combined with {@link Epics#combineEpics(Iterable)}.
*
* @param rootEpic epic to run once store is created
* @param <T> state type of {@link Store}
* @return instance of EpicMiddleware to be passed to {@link Store#create(Reducer, Middleware[])}
*/
public static <T> EpicMiddleware<T> create(Epic<T> rootEpic) {
return new EpicMiddleware<>(rootEpic);
}

@Override
public Dispatcher create(Store<T> store, Dispatcher nextDispatcher) {
PublishSubject<Action> actions = PublishSubject.create();
disposable = epic.run(actions, store).subscribe(store::dispatch);
return action -> {
nextDispatcher.dispatch(action);
if(action instanceof Action) {
actions.onNext((Action) action);
}
};
}

@Override
public void dispose() {
if (disposable != null) {
disposable.dispose();
}
}

@Override
public boolean isDisposed() {
return disposable != null && disposable.isDisposed();
}
}
@@ -0,0 +1,35 @@
package com.yheriatovych.reductor.observable.rxjava2;

import com.yheriatovych.reductor.Action;
import io.reactivex.Observable;
import io.reactivex.functions.Predicate;

public class Epics {
private Epics() {
}

/**
* Combine several Epics into one.
* <p>
* Returned streams will be merged with {@link Observable#merge(Iterable)}
*
* @param epics Epics to combine
* @param <T> state type
* @return Epic that will combine all provided epics behaviour
*/
public static <T> Epic<T> combineEpics(Iterable<Epic<T>> epics) {
return (actions, store) -> Observable.fromIterable(epics)
.flatMap(epic -> epic.run(actions, store));
}

/**
* Useful predicate to be used with {@link Observable#filter(Predicate)} in Epic implementation
* to filter only actions with specific {@link Action#type}
*
* @param type Action type filtered
* @return Predicate that will check if {@link Action#type} equals to specified type
*/
public static Predicate<Action> ofType(String type) {
return action -> type.equals(action.type);
}
}
@@ -0,0 +1,81 @@
package com.yheriatovych.reductor.observable.rxjava2;

import com.yheriatovych.reductor.Action;
import com.yheriatovych.reductor.Reducer;
import com.yheriatovych.reductor.Store;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.subjects.PublishSubject;
import org.junit.Before;
import org.junit.Test;
import org.mockito.*;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;

public class EpicMiddlewareTest {

private class TestState {

}

private class TestReducer implements Reducer<TestState> {
@Override
public TestState reduce(TestState testState, Action action) {
if(testState == null) {
testState = new TestState();
}
return testState;
}
}

@Spy TestReducer reducer = new TestReducer();
PublishSubject<Object> epicObservable = PublishSubject.create();
@Mock
Epic<TestState> epic;
@Captor ArgumentCaptor<Observable<Action>> actionsCaptor;
Store<TestState> store;
EpicMiddleware<TestState> epicMiddleware;

@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(epic.run(actionsCaptor.capture(), any())).thenReturn(epicObservable);
epicMiddleware = EpicMiddleware.create(epic);
}

@Test
public void testSubscribeToEpic() {
store = Store.create(reducer, epicMiddleware);

verify(epic).run(any(), eq(store));
assertTrue(epicObservable.hasObservers());
}

@Test
public void testPropagateActionsToEpic() {
store = Store.create(reducer, epicMiddleware);

TestObserver<Action> testObserver = TestObserver.create();
actionsCaptor.getValue().subscribe(testObserver);

Action testAction = Action.create("TEST");
store.dispatch(testAction);

testObserver.assertValue(testAction);
}

@Test
public void testUnsubscriptionEpic() {
store = Store.create(reducer, epicMiddleware);

assertTrue("epic observable should has observers after Store.create", epicObservable.hasObservers());

assertFalse("Epic should not be unsubscribed", epicMiddleware.isDisposed());
epicMiddleware.dispose();
assertTrue("Epic should be unsubscribed", epicMiddleware.isDisposed());

assertFalse("epic observable is unsubscibed after middleware.unsubscribe", epicObservable.hasObservers());
}
}
2 changes: 1 addition & 1 deletion settings.gradle
@@ -1,2 +1,2 @@
include ':example', ':compiller', ':lib', 'reductor-rxjava', 'reductor-rxjava2', 'reductor-observable'
include ':example', ':compiller', ':lib', 'reductor-rxjava', 'reductor-rxjava2', 'reductor-observable', 'reductor-observable-rxjava2'

0 comments on commit fd3aa2b

Please sign in to comment.