diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java index 1a5a0907b..47badcae2 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java @@ -22,6 +22,7 @@ import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixRequestEvents; import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixUtilization; import com.netflix.hystrix.metric.HystrixRequestEventsStream; +import com.netflix.hystrix.metric.consumer.HystrixDashboardStream; import com.netflix.hystrix.metric.sample.HystrixUtilizationStream; import io.reactivesocket.Payload; import rx.Observable; diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixDashboardData.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixDashboardData.java index 277f344b6..c64e3dcaa 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixDashboardData.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixDashboardData.java @@ -25,7 +25,7 @@ import com.netflix.hystrix.HystrixEventType; import com.netflix.hystrix.HystrixThreadPoolKey; import com.netflix.hystrix.HystrixThreadPoolMetrics; -import com.netflix.hystrix.contrib.reactivesocket.HystrixDashboardStream; +import com.netflix.hystrix.metric.consumer.HystrixDashboardStream; import org.agrona.LangUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/HystrixDashboardStream.java b/hystrix-core/src/main/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStream.java similarity index 74% rename from hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/HystrixDashboardStream.java rename to hystrix-core/src/main/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStream.java index 370d9be87..eb47253ce 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/HystrixDashboardStream.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStream.java @@ -13,12 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.netflix.hystrix.contrib.reactivesocket; +package com.netflix.hystrix.metric.consumer; import com.netflix.hystrix.HystrixCollapserMetrics; import com.netflix.hystrix.HystrixCommandMetrics; import com.netflix.hystrix.HystrixThreadPoolMetrics; import rx.Observable; +import rx.functions.Action0; +import rx.functions.Func1; import java.util.Collection; import java.util.concurrent.TimeUnit; @@ -32,13 +34,28 @@ public class HystrixDashboardStream { private HystrixDashboardStream(int delayInMs) { this.delayInMs = delayInMs; this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS) - .map(timestamp -> new DashboardData( - HystrixCommandMetrics.getInstances(), - HystrixThreadPoolMetrics.getInstances(), - HystrixCollapserMetrics.getInstances() - )) - .doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true)) - .doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false)) + .map(new Func1() { + @Override + public DashboardData call(Long timestamp) { + return new DashboardData( + HystrixCommandMetrics.getInstances(), + HystrixThreadPoolMetrics.getInstances(), + HystrixCollapserMetrics.getInstances() + ); + } + }) + .doOnSubscribe(new Action0() { + @Override + public void call() { + isSourceCurrentlySubscribed.set(true); + } + }) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + isSourceCurrentlySubscribed.set(false); + } + }) .share() .onBackpressureDrop(); } diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixDashboardStreamTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStreamTest.java similarity index 66% rename from hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixDashboardStreamTest.java rename to hystrix-core/src/test/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStreamTest.java index 2c4e1e822..dc95009e9 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixDashboardStreamTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/metric/consumer/HystrixDashboardStreamTest.java @@ -13,17 +13,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.netflix.hystrix.contrib.reactivesocket; +package com.netflix.hystrix.metric.consumer; +import com.hystrix.junit.HystrixRequestContextRule; import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixCommandMetrics; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.metric.CommandStreamTest; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import rx.Observable; import rx.Subscriber; import rx.Subscription; -import rx.functions.Actions; +import rx.functions.Action0; +import rx.functions.Func1; +import rx.functions.Func2; import rx.schedulers.Schedulers; import java.util.concurrent.CountDownLatch; @@ -34,9 +41,14 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -public class HystrixDashboardStreamTest extends HystrixStreamTest { +public class HystrixDashboardStreamTest extends CommandStreamTest { + + @Rule + public HystrixRequestContextRule ctx = new HystrixRequestContextRule(); HystrixDashboardStream stream; + private final static HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Dashboard"); + private final static HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("DashboardCommand"); @Before public void init() { @@ -46,26 +58,37 @@ public void init() { @Test public void testStreamHasData() throws Exception { final AtomicBoolean commandShowsUp = new AtomicBoolean(false); - CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); final int NUM = 10; for (int i = 0; i < 2; i++) { - HystrixCommand cmd = new SyntheticBlockingCommand(); + HystrixCommand cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50); cmd.observe(); } - stream.observe().take(NUM).subscribe(dashboardData -> { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Received data with : " + dashboardData.commandMetrics.size() + " commands"); - for (HystrixCommandMetrics metrics: dashboardData.commandMetrics) { - if (metrics.getCommandKey().name().equals("SyntheticBlockingCommand")) { - commandShowsUp.set(true); + stream.observe().take(NUM).subscribe( + new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnError : " + e); + latch.countDown(); + } + + @Override + public void onNext(HystrixDashboardStream.DashboardData dashboardData) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Received data with : " + dashboardData.commandMetrics.size() + " commands"); + for (HystrixCommandMetrics metrics : dashboardData.commandMetrics) { + if (metrics.getCommandKey().equals(commandKey)) { + commandShowsUp.set(true); + } } } - }, - Actions.empty(), - () -> { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted"); - latch.countDown(); }); assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); @@ -74,15 +97,20 @@ public void testStreamHasData() throws Exception { @Test public void testTwoSubscribersOneUnsubscribes() throws Exception { - CountDownLatch latch1 = new CountDownLatch(1); - CountDownLatch latch2 = new CountDownLatch(1); - AtomicInteger payloads1 = new AtomicInteger(0); - AtomicInteger payloads2 = new AtomicInteger(0); + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + final AtomicInteger payloads1 = new AtomicInteger(0); + final AtomicInteger payloads2 = new AtomicInteger(0); Subscription s1 = stream .observe() .take(100) - .doOnUnsubscribe(latch1::countDown) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + latch1.countDown(); + } + }) .subscribe(new Subscriber() { @Override public void onCompleted() { @@ -106,7 +134,12 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) { Subscription s2 = stream .observe() .take(100) - .doOnUnsubscribe(latch2::countDown) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + latch2.countDown(); + } + }) .subscribe(new Subscriber() { @Override public void onCompleted() { @@ -128,7 +161,7 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) { }); //execute 1 command, then unsubscribe from first stream. then execute the rest for (int i = 0; i < 50; i++) { - HystrixCommand cmd = new SyntheticBlockingCommand(); + HystrixCommand cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50); cmd.execute(); if (i == 1) { s1.unsubscribe(); @@ -145,15 +178,20 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) { @Test public void testTwoSubscribersBothUnsubscribe() throws Exception { - CountDownLatch latch1 = new CountDownLatch(1); - CountDownLatch latch2 = new CountDownLatch(1); - AtomicInteger payloads1 = new AtomicInteger(0); - AtomicInteger payloads2 = new AtomicInteger(0); + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + final AtomicInteger payloads1 = new AtomicInteger(0); + final AtomicInteger payloads2 = new AtomicInteger(0); Subscription s1 = stream .observe() .take(10) - .doOnUnsubscribe(latch1::countDown) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + latch1.countDown(); + } + }) .subscribe(new Subscriber() { @Override public void onCompleted() { @@ -177,7 +215,12 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) { Subscription s2 = stream .observe() .take(10) - .doOnUnsubscribe(latch2::countDown) + .doOnUnsubscribe(new Action0() { + @Override + public void call() { + latch2.countDown(); + } + }) .subscribe(new Subscriber() { @Override public void onCompleted() { @@ -199,7 +242,7 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) { }); //execute half the commands, then unsubscribe from both streams, then execute the rest for (int i = 0; i < 50; i++) { - HystrixCommand cmd = new SyntheticBlockingCommand(); + HystrixCommand cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50); cmd.execute(); if (i == 25) { s1.unsubscribe(); @@ -217,8 +260,8 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) { @Test public void testTwoSubscribersOneSlowOneFast() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean foundError = new AtomicBoolean(false); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean foundError = new AtomicBoolean(false); Observable fast = stream .observe() @@ -226,16 +269,24 @@ public void testTwoSubscribersOneSlowOneFast() throws Exception { Observable slow = stream .observe() .observeOn(Schedulers.newThread()) - .map(n -> { - try { - Thread.sleep(100); - return n; - } catch (InterruptedException ex) { - return n; + .map(new Func1() { + @Override + public HystrixDashboardStream.DashboardData call(HystrixDashboardStream.DashboardData n) { + try { + Thread.sleep(100); + return n; + } catch (InterruptedException ex) { + return n; + } } }); - Observable checkZippedEqual = Observable.zip(fast, slow, (payload, payload2) -> payload == payload2); + Observable checkZippedEqual = Observable.zip(fast, slow, new Func2() { + @Override + public Boolean call(HystrixDashboardStream.DashboardData payload, HystrixDashboardStream.DashboardData payload2) { + return payload == payload2; + } + }); Subscription s1 = checkZippedEqual .take(10000) @@ -261,7 +312,7 @@ public void onNext(Boolean b) { }); for (int i = 0; i < 50; i++) { - HystrixCommand cmd = new SyntheticBlockingCommand(); + HystrixCommand cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50); cmd.execute(); }