Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Novoda contributions #15

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -54,6 +54,9 @@ out
*.iml
atlassian-ide-plugin.xml

# AndroidStudio specific files/directories
local.properties

# Eclipse specific files/directories
.classpath
.project
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/rx/android/exception/CancelledException.java
@@ -0,0 +1,8 @@
package rx.android.exception;

public class CancelledException extends NavigationException {
@Override
public String getMessage() {
return "User cancelled the action";
}
}
4 changes: 4 additions & 0 deletions src/main/java/rx/android/exception/FailedException.java
@@ -0,0 +1,4 @@
package rx.android.exception;

public class FailedException extends NavigationException {
}
4 changes: 4 additions & 0 deletions src/main/java/rx/android/exception/NavigationException.java
@@ -0,0 +1,4 @@
package rx.android.exception;

public abstract class NavigationException extends Throwable {
}
9 changes: 9 additions & 0 deletions src/main/java/rx/android/observables/ActivityStarter.java
@@ -0,0 +1,9 @@
package rx.android.observables;

import android.content.Intent;

public interface ActivityStarter {

void startActivityForResult(Intent intent, int requestCode);

}
103 changes: 103 additions & 0 deletions src/main/java/rx/android/observables/ReactiveDialog.java
@@ -0,0 +1,103 @@
package rx.android.observables;

import android.app.DialogFragment;
import android.app.FragmentManager;
import android.content.DialogInterface;
import android.os.Bundle;

import java.util.UUID;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.android.exception.CancelledException;
import rx.resumable.SubscriberVault;

public class ReactiveDialog<T> extends DialogFragment {

private static final String REACTIVE_DIALOG_KEY = "REACTIVE_DIALOG_KEY";

private static final SubscriberVault subscriberVault = new SubscriberVault();

protected interface ReactiveDialogListener<T> extends Observer<T> {
void onCompleteWith(T value);

void onCancel();
}

public Observable<T> show(final FragmentManager manager) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(rx.Subscriber<? super T> subscriber) {
UUID key = subscriberVault.store(subscriber);
storeSubscriberKey(key);
show(manager, getClass().getSimpleName());
}
});
}

@Override
public void onCancel(DialogInterface dialog) {
super.onCancel(dialog);
getListener().onCancel();
}

protected ReactiveDialogListener<T> getListener() {
Subscriber<Object> subscriber = subscriberVault.get(getSubscriberKey());
if (subscriber == null) {
throw new IllegalStateException("No listener attached, you are probably trying to deliver a result after completion of the observable");
}
return new ReactiveDialogObserver<T>(subscriber);
}

private void storeSubscriberKey(UUID key) {
if (getArguments() == null) {
setArguments(new Bundle());
}
getArguments().putSerializable(REACTIVE_DIALOG_KEY, key);
}

private UUID getSubscriberKey() {
return (UUID) getArguments().getSerializable(REACTIVE_DIALOG_KEY);
}

private class ReactiveDialogObserver<T> implements ReactiveDialogListener<T> {

private final Subscriber<? super T> subscriber;

public ReactiveDialogObserver(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}

@Override
public void onNext(T value) {
subscriber.onNext(value);
}

@Override
public void onCompleteWith(T value) {
subscriber.onNext(value);
subscriber.onCompleted();
subscriberVault.remove(getSubscriberKey());
}

@Override
public void onCancel() {
subscriber.onError(new CancelledException());
subscriberVault.remove(getSubscriberKey());
}

@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
subscriberVault.remove(getSubscriberKey());
}

@Override
public void onCompleted() {
subscriber.onCompleted();
subscriberVault.remove(getSubscriberKey());
}
}

}
66 changes: 66 additions & 0 deletions src/main/java/rx/android/observables/ReactiveNavigator.java
@@ -0,0 +1,66 @@
package rx.android.observables;

import android.app.Activity;
import android.content.Intent;
import android.util.SparseArray;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.android.exception.CancelledException;
import rx.android.exception.FailedException;

public class ReactiveNavigator {

private final ActivityStarter activityStarter;

private static final SparseArray<Observer<? super Intent>> activityResultObservers = new SparseArray<Observer<? super Intent>>();

public ReactiveNavigator(ActivityStarter activityStarter) {
this.activityStarter = activityStarter;
}

public Observable<Intent> toActivityForResult(final Intent intent, final int requestCode) {
return Observable.create(new Observable.OnSubscribe<Intent>() {
@Override
public void call(Subscriber<? super Intent> subscriber) {
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
activityResultObservers.remove(requestCode);
}

@Override
public boolean isUnsubscribed() {
return activityResultObservers.get(requestCode) == null;
}
});
activityResultObservers.put(requestCode, subscriber);
activityStarter.startActivityForResult(intent, requestCode);
}
});
}

public void onActivityResult(int requestCode, int resultCode, Intent data) {
Observer<? super Intent> observer = activityResultObservers.get(requestCode);
if (observer == null) {
return;
}
switch (resultCode) {
case Activity.RESULT_OK:
observer.onNext(data);
observer.onCompleted();
activityResultObservers.remove(requestCode);
break;
case Activity.RESULT_CANCELED:
observer.onError(new CancelledException());
activityResultObservers.remove(requestCode);
break;
default:
observer.onError(new FailedException());
break;
}
}

}
34 changes: 34 additions & 0 deletions src/main/java/rx/resumable/ObservableVault.java
@@ -0,0 +1,34 @@
package rx.resumable;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import rx.Observable;

public class ObservableVault {

private final Map<Integer, Map<Integer, Observable>> arraysOfObservables = new HashMap<Integer, Map<Integer, Observable>>();

public synchronized Observable put(ResumableReference reference, int id, Observable observable) {
return getObservablesFor(reference).put(id, observable);
}

public synchronized Observable remove(ResumableReference reference, int id) {
return getObservablesFor(reference).remove(id);
}

public synchronized Map<Integer, Observable> getImmutableObservablesFor(ResumableReference reference) {
return Collections.unmodifiableMap(new HashMap<Integer, Observable>(getObservablesFor(reference)));
}

private synchronized Map<Integer, Observable> getObservablesFor(ResumableReference reference) {
int referenceId = reference.getResumableId();
if (arraysOfObservables.containsKey(referenceId)) {
return arraysOfObservables.get(referenceId);
}
Map<Integer, Observable> observableMap = new HashMap<Integer, Observable>();
arraysOfObservables.put(referenceId, observableMap);
return observableMap;
}
}
7 changes: 7 additions & 0 deletions src/main/java/rx/resumable/ObserverFactory.java
@@ -0,0 +1,7 @@
package rx.resumable;

import rx.resumable.observer.ResumableObserver;

public interface ObserverFactory {
ResumableObserver createObserver(int code);
}
5 changes: 5 additions & 0 deletions src/main/java/rx/resumable/ResumableReference.java
@@ -0,0 +1,5 @@
package rx.resumable;

public interface ResumableReference {
int getResumableId();
}
118 changes: 118 additions & 0 deletions src/main/java/rx/resumable/ResumableSubscriber.java
@@ -0,0 +1,118 @@
package rx.resumable;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import rx.Observable;
import rx.Subscription;
import rx.resumable.observer.ResumableObserver;
import rx.resumable.operators.DropEventOperator;
import rx.resumable.operators.EventCachingOperator;
import rx.resumable.operators.EventForwardingListener;
import rx.resumable.subject.ForwardingSubject;

public class ResumableSubscriber {

private final List<SubscriptionWithId> subscriptions = new ArrayList<SubscriptionWithId>();
private final ResumableReference resumableReference;
private final ObservableVault observableVault;
private final ObserverFactory factory;

public ResumableSubscriber(ResumableReference resumableReference, ObserverFactory factory, ObservableVault observableVault) {
this.resumableReference = resumableReference;
this.observableVault = observableVault;
this.factory = factory;
}

public void resume() {
Map<Integer, Observable> observableMap = observableVault.getImmutableObservablesFor(resumableReference);
for (Map.Entry<Integer, Observable> observableEntry : observableMap.entrySet()) {
int observerId = observableEntry.getKey();
Observable observable = observableEntry.getValue();
final Subscription subscription = observable.subscribe(factory.createObserver(observerId));
subscriptions.add(new SubscriptionWithId(subscription, observerId));
}
}

public void pause() {
for (Subscription subscription : subscriptions) {
subscription.unsubscribe();
}
subscriptions.clear();
}

public <T> void subscribe(Observable<? extends T> observable, ResumableObserver<T> observer) {
ForwardingSubject<T> proxy = new ForwardingSubject<T>(new EventCachingOperator<T>(new CleanObservable(observer.getId())));
observable.subscribe(proxy);
subscribe(observer, proxy);
}

public <T> void subscribeWithReplay(Observable<? extends T> observable, ResumableObserver<T> observer) {
Observable<? extends T> replayEnabled = observable.cache();
subscribe(observer, replayEnabled);
}

public <T> void subscribeWithDrop(Observable<? extends T> observable, ResumableObserver<T> observer) {
ForwardingSubject<T> proxy = new ForwardingSubject<T>(new DropEventOperator<T>(new CleanObservable(observer.getId())));
observable.subscribe(proxy);
subscribe(observer, proxy);
}

private <T> void subscribe(final ResumableObserver<T> observer, final Observable<? extends T> observable) {
observableVault.put(resumableReference, observer.getId(), observable);
final SubscriptionWithId subscription = new SubscriptionWithId(observable.subscribe(observer), observer.getId());
subscriptions.add(subscription);
}

public void unsubscribe(final int observerId) {
for (Iterator<SubscriptionWithId> iterator = subscriptions.iterator(); iterator.hasNext(); ) {
SubscriptionWithId subscription = iterator.next();
if (subscription.getId() == observerId) {
subscription.unsubscribe();
iterator.remove();
}
}
observableVault.remove(resumableReference, observerId);
}

private class CleanObservable implements EventForwardingListener {

private final int id;

public CleanObservable(int id) {
this.id = id;
}

@Override
public void allEventsForwarded() {
observableVault.remove(resumableReference, id);
}
}

private static final class SubscriptionWithId implements Subscription {
private final Subscription subscription;
private final int id;

private SubscriptionWithId(final Subscription subscription, final int id) {
this.subscription = subscription;
this.id = id;
}

public int getId() {
return id;
}

@Override
public void unsubscribe() {
subscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return subscription.isUnsubscribed();
}
}

}