Permalink
Browse files

RxJava 0.18

  • Loading branch information...
1 parent 65ad7ec commit f39fcdcbcdb8426454f258ea311323b9379dbaf9 @benjchristensen benjchristensen committed Apr 24, 2014
@@ -18,7 +18,7 @@
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import rx.Observable;
import rx.Observer;
-import rx.util.functions.Action1;
+import rx.functions.Action1;
import static com.netflix.hystrix.contrib.javanica.CommonUtils.getHystrixCommandByKey;
import static org.junit.Assert.assertEquals;
@@ -4,7 +4,7 @@ apply plugin: 'idea'
dependencies {
compile 'com.netflix.archaius:archaius-core:0.4.1'
- compile 'com.netflix.rxjava:rxjava-core:0.17.3'
+ compile 'com.netflix.rxjava:rxjava-core:0.18.1'
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'com.google.code.findbugs:jsr305:2.0.0'
testCompile 'junit:junit-dep:4.10'
@@ -649,7 +649,7 @@ protected TryableSemaphore getExecutionSemaphore() {
@Override
public void call(Subscriber<? super R> observer) {
- originalObservable.subscribe(observer);
+ originalObservable.unsafeSubscribe(observer);
}
});
this.command = command;
@@ -673,7 +673,7 @@ public void call(Subscriber<? super R> observer) {
@Override
public void call(final Subscriber<? super R> observer) {
- actual.subscribe(observer);
+ actual.unsafeSubscribe(observer);
}
}, command);
this.originalCommand = command;
@@ -315,22 +315,22 @@ public void call() {
executionSemaphore.release();
}
- }).subscribe(observer);
+ }).unsafeSubscribe(observer);
} catch (RuntimeException e) {
observer.onError(e);
}
} else {
metrics.markSemaphoreRejection();
logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
// retrieve a fallback or throw an exception if no fallback available
- getFallbackOrThrowException(HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION, "could not acquire a semaphore for execution").subscribe(observer);
+ getFallbackOrThrowException(HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION, "could not acquire a semaphore for execution").unsafeSubscribe(observer);
}
} else {
// record that we are returning a short-circuited fallback
metrics.markShortCircuited();
// short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
try {
- getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited").subscribe(observer);
+ getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited").unsafeSubscribe(observer);
} catch (Exception e) {
observer.onError(e);
}
@@ -441,11 +441,11 @@ public void call() {
executionHook.onThreadComplete(_self);
endCurrentThread.call();
}
- }).subscribe(s);
+ }).unsafeSubscribe(s);
} catch (Throwable t) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
- Observable.<R> error(t).subscribe(s);
+ Observable.<R> error(t).unsafeSubscribe(s);
}
}
}
@@ -798,7 +798,7 @@ public void run() {
* THREAD isolation pool as it may be saturated and that's the reason we're in fallback. The fallback logic
* should not perform IO and thus we run on the computation event loops.
*/
- v.subscribeOn(new HystrixContextScheduler(originalCommand.concurrencyStrategy, Schedulers.computation())).subscribe(child);
+ v.subscribeOn(new HystrixContextScheduler(originalCommand.concurrencyStrategy, Schedulers.computation())).unsafeSubscribe(child);
} catch (HystrixRuntimeException re) {
child.onError(re);
}
@@ -16,14 +16,11 @@
package com.netflix.hystrix.strategy.concurrency;
import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
-import rx.Scheduler.Inner;
-import rx.functions.Action1;
+import rx.functions.Action0;
import rx.functions.Func2;
import com.netflix.hystrix.strategy.HystrixPlugins;
-import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.HystrixContextInnerScheduler;
/**
* Wrapper around {@link Func2} that manages the {@link HystrixRequestContext} initialization and cleanup for the execution of the {@link Func2}
@@ -33,30 +30,17 @@
*
* @ExcludeFromJavadoc
*/
-public class HystrixContexSchedulerAction implements Action1<Inner> {
+public class HystrixContexSchedulerAction implements Action0 {
- private final Action1<Inner> actual;
+ private final Action0 actual;
private final HystrixRequestContext parentThreadState;
private final Callable<Void> c;
- /*
- * This is a workaround to needing to use Callable<Void> but
- * needing to pass `Inner t1` into it after construction.
- *
- * Think of it like sticking t1 on the stack and then calling the function
- * that uses them.
- *
- * This should all be thread-safe without issues despite multi-step execution
- * because this Action0 is only ever executed once by Hystrix and construction will always
- * precede `call` being invoked once.
- */
- private final AtomicReference<Inner> t1Holder = new AtomicReference<Inner>();
-
- public HystrixContexSchedulerAction(Action1<Inner> action) {
+ public HystrixContexSchedulerAction(Action0 action) {
this(HystrixPlugins.getInstance().getConcurrencyStrategy(), action);
}
- public HystrixContexSchedulerAction(final HystrixConcurrencyStrategy concurrencyStrategy, Action1<Inner> action) {
+ public HystrixContexSchedulerAction(final HystrixConcurrencyStrategy concurrencyStrategy, Action0 action) {
this.actual = action;
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
@@ -68,8 +52,8 @@ public Void call() throws Exception {
try {
// set the state of this thread to that of its parent
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
- // execute actual Action1<Inner> with the state of the parent
- actual.call(new HystrixContextInnerScheduler(concurrencyStrategy, t1Holder.get()));
+ // execute actual Action0 with the state of the parent
+ actual.call();
return null;
} finally {
// restore this thread back to its original state
@@ -80,12 +64,11 @@ public Void call() throws Exception {
}
@Override
- public void call(Inner inner) {
+ public void call() {
try {
- this.t1Holder.set(inner);
c.call();
} catch (Exception e) {
- throw new RuntimeException("Failed executing wrapped Func2", e);
+ throw new RuntimeException("Failed executing wrapped Action0", e);
}
}
@@ -17,12 +17,14 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import rx.Scheduler;
import rx.Subscription;
-import rx.functions.Action1;
-import rx.schedulers.Schedulers;
+import rx.functions.Action0;
import rx.subscriptions.BooleanSubscription;
+import rx.subscriptions.CompositeSubscription;
+import rx.subscriptions.Subscriptions;
import com.netflix.hystrix.HystrixThreadPool;
import com.netflix.hystrix.strategy.HystrixPlugins;
@@ -42,7 +44,7 @@ public HystrixContextScheduler(Scheduler scheduler) {
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.threadPool = null;
}
-
+
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, Scheduler scheduler) {
this.actualScheduler = scheduler;
this.concurrencyStrategy = concurrencyStrategy;
@@ -52,26 +54,22 @@ public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, S
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
- this.actualScheduler = Schedulers.executor(threadPool.getExecutor());
+ this.actualScheduler = new ThreadPoolScheduler(threadPool);
}
@Override
- public Subscription schedule(Action1<Inner> action) {
- InnerHystrixContextScheduler inner = new InnerHystrixContextScheduler();
- inner.schedule(action);
- return inner;
+ public Worker createWorker() {
+ return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
- @Override
- public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
- InnerHystrixContextScheduler inner = new InnerHystrixContextScheduler();
- inner.schedule(action, delayTime, unit);
- return inner;
- }
-
- private class InnerHystrixContextScheduler extends Inner {
+ private class HystrixContextSchedulerWorker extends Worker {
private BooleanSubscription s = new BooleanSubscription();
+ private final Worker worker;
+
+ private HystrixContextSchedulerWorker(Worker actualWorker) {
+ this.worker = actualWorker;
+ }
@Override
public void unsubscribe() {
@@ -84,56 +82,108 @@ public boolean isUnsubscribed() {
}
@Override
- public void schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
+ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
- actualScheduler.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
+ return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
@Override
- public void schedule(Action1<Inner> action) {
+ public Subscription schedule(Action0 action) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
- actualScheduler.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
+ return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
}
- public static class HystrixContextInnerScheduler extends Inner {
+ private static class ThreadPoolScheduler extends Scheduler {
+
+ private final HystrixThreadPool threadPool;
+
+ public ThreadPoolScheduler(HystrixThreadPool threadPool) {
+ this.threadPool = threadPool;
+ }
+
+ @Override
+ public Worker createWorker() {
+ return new ThreadPoolWorker(threadPool);
+ }
- private final HystrixConcurrencyStrategy concurrencyStrategy;
- private final Inner actual;
+ }
- HystrixContextInnerScheduler(HystrixConcurrencyStrategy concurrencyStrategy, Inner actual) {
- this.concurrencyStrategy = concurrencyStrategy;
- this.actual = actual;
+ /**
+ * Purely for scheduling work on a thread-pool.
+ * <p>
+ * This is not natively supported by RxJava as of 0.18.0 because thread-pools
+ * are contrary to sequential execution.
+ * <p>
+ * For the Hystrix case, each Command invocation has a single action so the concurrency
+ * issue is not a problem.
+ */
+ private static class ThreadPoolWorker extends Worker {
+
+ private final HystrixThreadPool threadPool;
+ private final CompositeSubscription subscription = new CompositeSubscription();
+
+ public ThreadPoolWorker(HystrixThreadPool threadPool) {
+ this.threadPool = threadPool;
}
@Override
public void unsubscribe() {
- actual.unsubscribe();
+ subscription.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
- return actual.isUnsubscribed();
+ return subscription.isUnsubscribed();
}
@Override
- public void schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
- actual.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
+ public Subscription schedule(final Action0 action) {
+ if (subscription.isUnsubscribed()) {
+ // don't schedule, we are unsubscribed
+ return Subscriptions.empty();
+ }
+
+ final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();
+ Subscription s = Subscriptions.from(threadPool.getExecutor().submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ if (subscription.isUnsubscribed()) {
+ return;
+ }
+ action.call();
+ } finally {
+ // remove the subscription now that we're completed
+ Subscription s = sf.get();
+ if (s != null) {
+ subscription.remove(s);
+ }
+ }
+ }
+ }));
+
+ sf.set(s);
+ subscription.add(s);
+ return s;
}
@Override
- public void schedule(Action1<Inner> action) {
- actual.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
+ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
+ System.out.println("delayed scheduling");
+ throw new IllegalStateException("Hystrix does not support delayed scheduling");
}
}
+
}
@@ -23,7 +23,7 @@
import rx.Observable;
import rx.Observer;
-import rx.util.functions.Action1;
+import rx.functions.Action1;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

0 comments on commit f39fcdc

Please sign in to comment.