Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move HystrixDashboardStream to hystrix-core #1246

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -22,6 +22,7 @@
import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixRequestEvents;
import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixUtilization;
import com.netflix.hystrix.metric.HystrixRequestEventsStream;
import com.netflix.hystrix.metric.consumer.HystrixDashboardStream;
import com.netflix.hystrix.metric.sample.HystrixUtilizationStream;
import io.reactivesocket.Payload;
import rx.Observable;
Expand Down
Expand Up @@ -25,7 +25,7 @@
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolMetrics;
import com.netflix.hystrix.contrib.reactivesocket.HystrixDashboardStream;
import com.netflix.hystrix.metric.consumer.HystrixDashboardStream;
import org.agrona.LangUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Expand Up @@ -13,12 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.contrib.reactivesocket;
package com.netflix.hystrix.metric.consumer;

import com.netflix.hystrix.HystrixCollapserMetrics;
import com.netflix.hystrix.HystrixCommandMetrics;
import com.netflix.hystrix.HystrixThreadPoolMetrics;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func1;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,13 +34,28 @@ public class HystrixDashboardStream {
private HystrixDashboardStream(int delayInMs) {
this.delayInMs = delayInMs;
this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
.map(timestamp -> new DashboardData(
HystrixCommandMetrics.getInstances(),
HystrixThreadPoolMetrics.getInstances(),
HystrixCollapserMetrics.getInstances()
))
.doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
.doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
.map(new Func1<Long, DashboardData>() {
@Override
public DashboardData call(Long timestamp) {
return new DashboardData(
HystrixCommandMetrics.getInstances(),
HystrixThreadPoolMetrics.getInstances(),
HystrixCollapserMetrics.getInstances()
);
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
Expand Down
Expand Up @@ -13,17 +13,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.contrib.reactivesocket;
package com.netflix.hystrix.metric.consumer;

import com.hystrix.junit.HystrixRequestContextRule;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandMetrics;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.metric.CommandStreamTest;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Actions;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

import java.util.concurrent.CountDownLatch;
Expand All @@ -34,9 +41,14 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class HystrixDashboardStreamTest extends HystrixStreamTest {
public class HystrixDashboardStreamTest extends CommandStreamTest {

@Rule
public HystrixRequestContextRule ctx = new HystrixRequestContextRule();

HystrixDashboardStream stream;
private final static HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Dashboard");
private final static HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("DashboardCommand");

@Before
public void init() {
Expand All @@ -46,26 +58,37 @@ public void init() {
@Test
public void testStreamHasData() throws Exception {
final AtomicBoolean commandShowsUp = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
final int NUM = 10;

for (int i = 0; i < 2; i++) {
HystrixCommand<Integer> cmd = new SyntheticBlockingCommand();
HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
cmd.observe();
}

stream.observe().take(NUM).subscribe(dashboardData -> {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Received data with : " + dashboardData.commandMetrics.size() + " commands");
for (HystrixCommandMetrics metrics: dashboardData.commandMetrics) {
if (metrics.getCommandKey().name().equals("SyntheticBlockingCommand")) {
commandShowsUp.set(true);
stream.observe().take(NUM).subscribe(
new Subscriber<HystrixDashboardStream.DashboardData>() {
@Override
public void onCompleted() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted");
latch.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnError : " + e);
latch.countDown();
}

@Override
public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Received data with : " + dashboardData.commandMetrics.size() + " commands");
for (HystrixCommandMetrics metrics : dashboardData.commandMetrics) {
if (metrics.getCommandKey().equals(commandKey)) {
commandShowsUp.set(true);
}
}
}
},
Actions.empty(),
() -> {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted");
latch.countDown();
});

assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
Expand All @@ -74,15 +97,20 @@ public void testStreamHasData() throws Exception {

@Test
public void testTwoSubscribersOneUnsubscribes() throws Exception {
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
AtomicInteger payloads1 = new AtomicInteger(0);
AtomicInteger payloads2 = new AtomicInteger(0);
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
final AtomicInteger payloads1 = new AtomicInteger(0);
final AtomicInteger payloads2 = new AtomicInteger(0);

Subscription s1 = stream
.observe()
.take(100)
.doOnUnsubscribe(latch1::countDown)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
latch1.countDown();
}
})
.subscribe(new Subscriber<HystrixDashboardStream.DashboardData>() {
@Override
public void onCompleted() {
Expand All @@ -106,7 +134,12 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
Subscription s2 = stream
.observe()
.take(100)
.doOnUnsubscribe(latch2::countDown)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
latch2.countDown();
}
})
.subscribe(new Subscriber<HystrixDashboardStream.DashboardData>() {
@Override
public void onCompleted() {
Expand All @@ -128,7 +161,7 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
});
//execute 1 command, then unsubscribe from first stream. then execute the rest
for (int i = 0; i < 50; i++) {
HystrixCommand<Integer> cmd = new SyntheticBlockingCommand();
HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
cmd.execute();
if (i == 1) {
s1.unsubscribe();
Expand All @@ -145,15 +178,20 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {

@Test
public void testTwoSubscribersBothUnsubscribe() throws Exception {
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
AtomicInteger payloads1 = new AtomicInteger(0);
AtomicInteger payloads2 = new AtomicInteger(0);
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
final AtomicInteger payloads1 = new AtomicInteger(0);
final AtomicInteger payloads2 = new AtomicInteger(0);

Subscription s1 = stream
.observe()
.take(10)
.doOnUnsubscribe(latch1::countDown)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
latch1.countDown();
}
})
.subscribe(new Subscriber<HystrixDashboardStream.DashboardData>() {
@Override
public void onCompleted() {
Expand All @@ -177,7 +215,12 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
Subscription s2 = stream
.observe()
.take(10)
.doOnUnsubscribe(latch2::countDown)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
latch2.countDown();
}
})
.subscribe(new Subscriber<HystrixDashboardStream.DashboardData>() {
@Override
public void onCompleted() {
Expand All @@ -199,7 +242,7 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
});
//execute half the commands, then unsubscribe from both streams, then execute the rest
for (int i = 0; i < 50; i++) {
HystrixCommand<Integer> cmd = new SyntheticBlockingCommand();
HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
cmd.execute();
if (i == 25) {
s1.unsubscribe();
Expand All @@ -217,25 +260,33 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {

@Test
public void testTwoSubscribersOneSlowOneFast() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean foundError = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean foundError = new AtomicBoolean(false);

Observable<HystrixDashboardStream.DashboardData> fast = stream
.observe()
.observeOn(Schedulers.newThread());
Observable<HystrixDashboardStream.DashboardData> slow = stream
.observe()
.observeOn(Schedulers.newThread())
.map(n -> {
try {
Thread.sleep(100);
return n;
} catch (InterruptedException ex) {
return n;
.map(new Func1<HystrixDashboardStream.DashboardData, HystrixDashboardStream.DashboardData>() {
@Override
public HystrixDashboardStream.DashboardData call(HystrixDashboardStream.DashboardData n) {
try {
Thread.sleep(100);
return n;
} catch (InterruptedException ex) {
return n;
}
}
});

Observable<Boolean> checkZippedEqual = Observable.zip(fast, slow, (payload, payload2) -> payload == payload2);
Observable<Boolean> checkZippedEqual = Observable.zip(fast, slow, new Func2<HystrixDashboardStream.DashboardData, HystrixDashboardStream.DashboardData, Boolean>() {
@Override
public Boolean call(HystrixDashboardStream.DashboardData payload, HystrixDashboardStream.DashboardData payload2) {
return payload == payload2;
}
});

Subscription s1 = checkZippedEqual
.take(10000)
Expand All @@ -261,7 +312,7 @@ public void onNext(Boolean b) {
});

for (int i = 0; i < 50; i++) {
HystrixCommand<Integer> cmd = new SyntheticBlockingCommand();
HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
cmd.execute();
}

Expand Down