Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5774,6 +5774,27 @@ public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}

/**
* Maps the items of this observable by applying a mapper function to each value which may also
* throw a checked exception.
* <p>
* Throwing a checked exception terminates the sequence with an onError event.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>{@code mapIO} is a pass-through for backpressure; it doesn't interfere with requests
* or number of items delivered to downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mapIO} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the result type
* @param <E> the exception type
* @param mapper the function that maps the items of this Observable into (potentially) other type and values.
* @return the new Observable instance
*/
public final <R, E extends Exception> Observable<R> mapIO(Func1E<? super T, ? extends R, E> mapper) {
return create(new OnSubscribeMapIO<T, R, E>(this, mapper));
}

private final <R> Observable<R> mapNotification(Func1<? super T, ? extends R> onNext, Func1<? super Throwable, ? extends R> onError, Func0<? extends R> onCompleted) {
return lift(new OperatorMapNotification<T, R>(onNext, onError, onCompleted));
}
Expand Down
41 changes: 41 additions & 0 deletions src/main/java/rx/functions/Func1E.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.functions;

import rx.annotations.Experimental;

/**
* Represents a functional interface that accepts a value and returns another value
* or throws.
*
* @param <T> the input value type
* @param <R> the output value type
* @param <E> the exception type
*
* @since experimental
*/
@Experimental
public interface Func1E<T, R, E extends Exception> extends Function {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
* @throws E the exception of the function
*/
R call(T t) throws E;
}
106 changes: 106 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeMapIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.operators;

import rx.*;
import rx.Observable.OnSubscribe;
import rx.exceptions.Exceptions;
import rx.functions.Func1E;
import rx.plugins.RxJavaPlugins;

/**
* Operator that maps a sequence of values into other values via a custom mapper function
* which allows throwing an exception as well.
*
* @param <T> the incoming value type
* @param <R> the outgoing value type
* @param <E> the exception type
*/
public final class OnSubscribeMapIO<T, R, E extends Exception> implements OnSubscribe<R> {
final Observable<? extends T> source;
final Func1E<? super T, ? extends R, E> mapper;

public OnSubscribeMapIO(Observable<? extends T> source, Func1E<? super T, ? extends R, E> mapper) {
this.source = source;
this.mapper = mapper;
}

@Override
public void call(Subscriber<? super R> t) {
source.unsafeSubscribe(new MapIOSubscriber<T, R, E>(t, mapper));
}

/**
* The mapping subscriber that takes in Ts, returns Rs and potentially throws Es.
*
* @param <T> the incoming value type
* @param <R> the outgoing value type
* @param <E> the exception type
*/
static final class MapIOSubscriber<T, R, E extends Exception> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1E<? super T, ? extends R, E> mapper;

/** Strong guarantee that stops delivering events when the mapper throws. */
boolean done;

public MapIOSubscriber(Subscriber<? super R> actual, Func1E<? super T, ? extends R, E> mapper) {
// sharing SubscriptionList, pass-through for backpressure
super(actual);
this.actual = actual;
this.mapper = mapper;
}

@Override
public void onNext(T t) {
if (done) {
return;
}

R v;

try {
v = mapper.call(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}

actual.onNext(v);
}

@Override
public void onError(Throwable e) {
if (done) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
return;
}
done = true;
actual.onError(e);
}

@Override
public void onCompleted() {
if (done) {
return;
}
done = true;
actual.onCompleted();
}
}
}
118 changes: 118 additions & 0 deletions src/test/java/rx/internal/operators/OnSubscribeMapIOTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.operators;

import java.io.IOException;

import org.junit.Test;

import rx.Observable;
import rx.exceptions.*;
import rx.functions.Func1E;
import rx.observers.TestSubscriber;

public class OnSubscribeMapIOTest {
final Func1E<Integer, Integer, Exception> ADD_ONE = new Func1E<Integer, Integer, Exception>() {
@Override
public Integer call(Integer a) throws Exception {
return a + 1;
}
};

final Func1E<Integer, Integer, Exception> TRHOW_CHECKED = new Func1E<Integer, Integer, Exception>() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spelling

@Override
public Integer call(Integer a) throws Exception {
if (a == 2) {
throw new IOException();
}
return a + 1;
}
};

final Func1E<Integer, Integer, Exception> TRHOW_UNCHECKED = new Func1E<Integer, Integer, Exception>() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spelling

@Override
public Integer call(Integer a) throws Exception {
if (a == 2) {
throw new TestException();
}
return a + 1;
}
};

final Func1E<Integer, Integer, Exception> TRHOW_FATAL = new Func1E<Integer, Integer, Exception>() {
@Override
public Integer call(Integer a) throws Exception {
if (a == 2) {
throw new OnErrorNotImplementedException(new IOException());
}
return a + 1;
}
};

@Test
public void testSimple() {
TestSubscriber<Integer> ts = TestSubscriber.create();

Observable.range(1, 10).mapIO(ADD_ONE).subscribe(ts);

ts.assertValues(2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
ts.assertCompleted();
ts.assertNoErrors();
}

@Test
public void testSimpleBackpressure() {
TestSubscriber<Integer> ts = TestSubscriber.create(3);

Observable.range(1, 10).mapIO(ADD_ONE).subscribe(ts);

ts.assertValues(2, 3, 4);
ts.assertNotCompleted();
ts.assertNoErrors();
}

@Test
public void testThrowsChecked() {
TestSubscriber<Integer> ts = TestSubscriber.create();

Observable.range(1, 10).mapIO(TRHOW_CHECKED).subscribe(ts);

ts.assertValue(2);
ts.assertError(IOException.class);
ts.assertNotCompleted();

}
@Test
public void testThrowsUnchecked() {
TestSubscriber<Integer> ts = TestSubscriber.create();

Observable.range(1, 10).mapIO(TRHOW_UNCHECKED).subscribe(ts);

ts.assertValue(2);
ts.assertError(TestException.class);
ts.assertNotCompleted();

}

@Test(expected = OnErrorNotImplementedException.class)
public void testFatalBubblesUp() {
TestSubscriber<Integer> ts = TestSubscriber.create();

Observable.range(1, 10).mapIO(TRHOW_FATAL).subscribe(ts);
}

}