Skip to content

Commit

Permalink
Merge pull request #1254 from mattrjacobs/stream-sharing
Browse files Browse the repository at this point in the history
Allow multiple consumers of sample data to only trigger work once
  • Loading branch information
mattrjacobs committed Jun 22, 2016
2 parents 7c9e48b + e712bb7 commit 2747e91
Show file tree
Hide file tree
Showing 12 changed files with 868 additions and 122 deletions.
Expand Up @@ -18,8 +18,8 @@
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.hystrix.config.HystrixConfiguration;
import com.netflix.hystrix.config.HystrixConfigurationStream;
import rx.Observable;
import rx.functions.Func1;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -49,25 +49,16 @@ public class HystrixConfigSseServlet extends HystrixSampleSseServlet<HystrixConf

private static final long serialVersionUID = -3599771169762858235L;

private static final int DEFAULT_ONNEXT_DELAY_IN_MS = 10000;

private final HystrixConfigurationJsonStream jsonStream;

/* used to track number of connections and throttle */
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

public HystrixConfigSseServlet() {
this.jsonStream = new HystrixConfigurationJsonStream();
}

/* package-private */ HystrixConfigSseServlet(Func1<Integer, Observable<HystrixConfiguration>> createStream) {
this.jsonStream = new HystrixConfigurationJsonStream(createStream);
super(HystrixConfigurationStream.getInstance().observe());
}

@Override
int getDefaultDelayInMilliseconds() {
return DEFAULT_ONNEXT_DELAY_IN_MS;
/* package-private */ HystrixConfigSseServlet(Observable<HystrixConfiguration> sampleStream, int pausePollerThreadDelayInMs) {
super(sampleStream, pausePollerThreadDelayInMs);
}

@Override
Expand All @@ -90,11 +81,6 @@ protected void decrementCurrentConcurrentConnections() {
concurrentConnections.decrementAndGet();
}

@Override
protected Observable<HystrixConfiguration> getStream(int delay) {
return jsonStream.observe(delay);
}

@Override
protected String convertToString(HystrixConfiguration config) throws IOException {
return HystrixConfigurationJsonStream.convertToString(config);
Expand Down
Expand Up @@ -46,15 +46,17 @@ public class HystrixConfigurationJsonStream {
private static final JsonFactory jsonFactory = new JsonFactory();
private final Func1<Integer, Observable<HystrixConfiguration>> streamGenerator;

@Deprecated //since 1.5.4
public HystrixConfigurationJsonStream() {
this.streamGenerator = new Func1<Integer, Observable<HystrixConfiguration>>() {
@Override
public Observable<HystrixConfiguration> call(Integer delay) {
return new HystrixConfigurationStream(delay).observe();
return HystrixConfigurationStream.getInstance().observe();
}
};
}

@Deprecated //since 1.5.4
public HystrixConfigurationJsonStream(Func1<Integer, Observable<HystrixConfiguration>> streamGenerator) {
this.streamGenerator = streamGenerator;
}
Expand Down Expand Up @@ -171,10 +173,25 @@ public static String convertToString(HystrixConfiguration config) throws IOExcep
return jsonString.getBuffer().toString();
}

/**
* @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared.
* Please use {@link HystrixConfigurationStream#observe()}
* @param delay interval between data emissions
* @return sampled utilization as Java object, taken on a timer
*/
@Deprecated //deprecated in 1.5.4
public Observable<HystrixConfiguration> observe(int delay) {
return streamGenerator.call(delay);
}

/**
* @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared.
* Please use {@link HystrixConfigurationStream#observe()}
* and you can map to JSON string via {@link HystrixConfigurationJsonStream#convertToString(HystrixConfiguration)}
* @param delay interval between data emissions
* @return sampled utilization as JSON string, taken on a timer
*/
@Deprecated //deprecated in 1.5.4
public Observable<String> observeJson(int delay) {
return streamGenerator.call(delay).map(convertToJson);
}
Expand Down
Expand Up @@ -33,15 +33,27 @@
/**
*/
public abstract class HystrixSampleSseServlet<SampleData> extends HttpServlet {
protected final Observable<SampleData> sampleStream;

private static final Logger logger = LoggerFactory.getLogger(HystrixSampleSseServlet.class);

//wake up occasionally and check that poller is still alive. this value controls how often
private static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500;

private final int pausePollerThreadDelayInMs;

/* Set to true upon shutdown, so it's OK to be shared among all SampleSseServlets */
private static volatile boolean isDestroyed = false;

private static final String DELAY_REQ_PARAM_NAME = "delay";
protected HystrixSampleSseServlet(Observable<SampleData> sampleStream) {
this.sampleStream = sampleStream;
this.pausePollerThreadDelayInMs = DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS;
}

abstract int getDefaultDelayInMilliseconds();
protected HystrixSampleSseServlet(Observable<SampleData> sampleStream, int pausePollerThreadDelayInMs) {
this.sampleStream = sampleStream;
this.pausePollerThreadDelayInMs = pausePollerThreadDelayInMs;
}

abstract int getMaxNumberConcurrentConnectionsAllowed();

Expand All @@ -51,7 +63,7 @@ public abstract class HystrixSampleSseServlet<SampleData> extends HttpServlet {

protected abstract void decrementCurrentConcurrentConnections();

protected abstract Observable<SampleData> getStream(int delay);
//protected abstract Observable<SampleData> getStream();

protected abstract String convertToString(SampleData sampleData) throws IOException;

Expand All @@ -67,19 +79,6 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t
}
}

/* package-private */
int getDelayFromHttpRequest(HttpServletRequest req) {
try {
String delay = req.getParameter(DELAY_REQ_PARAM_NAME);
if (delay != null) {
return Math.max(Integer.parseInt(delay), 1);
}
} catch (Throwable ex) {
//silently fail
}
return getDefaultDelayInMilliseconds();
}

/**
* WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing
* a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at
Expand Down Expand Up @@ -125,20 +124,18 @@ private void handleRequest(HttpServletRequest request, final HttpServletResponse
if (numberConnections > maxNumberConnectionsAllowed) {
response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed);
} else {
int delay = getDelayFromHttpRequest(request);

/* initialize response */
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
response.setHeader("Pragma", "no-cache");

final PrintWriter writer = response.getWriter();

Observable<SampleData> sampledStream = getStream(delay);
//Observable<SampleData> sampledStream = getStream();

//since the sample stream is based on Observable.interval, events will get published on an RxComputation thread
//since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext
sampleSubscription = sampledStream
sampleSubscription = sampleStream
.observeOn(Schedulers.io())
.subscribe(new Subscriber<SampleData>() {
@Override
Expand Down Expand Up @@ -180,7 +177,7 @@ public void onNext(SampleData sampleData) {

while (moreDataWillBeSent.get() && !isDestroyed) {
try {
Thread.sleep(delay);
Thread.sleep(pausePollerThreadDelayInMs);
} catch (InterruptedException e) {
moreDataWillBeSent.set(false);
}
Expand Down
Expand Up @@ -55,15 +55,17 @@ public String call(HystrixUtilization utilization) {
}
};

@Deprecated //since 1.5.4
public HystrixUtilizationJsonStream() {
this.streamGenerator = new Func1<Integer, Observable<HystrixUtilization>>() {
@Override
public Observable<HystrixUtilization> call(Integer delay) {
return new HystrixUtilizationStream(delay).observe();
return HystrixUtilizationStream.getInstance().observe();
}
};
}

@Deprecated //since 1.5.4
public HystrixUtilizationJsonStream(Func1<Integer, Observable<HystrixUtilization>> streamGenerator) {
this.streamGenerator = streamGenerator;
}
Expand Down Expand Up @@ -111,10 +113,24 @@ protected static String convertToJson(HystrixUtilization utilization) throws IOE
return jsonString.getBuffer().toString();
}

/**
* @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared.
* Please use {@link HystrixUtilizationStream#observe()}
* @param delay interval between data emissions
* @return sampled utilization as Java object, taken on a timer
*/
@Deprecated //deprecated as of 1.5.4
public Observable<HystrixUtilization> observe(int delay) {
return streamGenerator.call(delay);
}

/**
* @deprecated Not for public use. Using the delay param prevents streams from being efficiently shared.
* Please use {@link HystrixUtilizationStream#observe()}
* and the {@link #convertToJson(HystrixUtilization)} method
* @param delay interval between data emissions
* @return sampled utilization as JSON string, taken on a timer
*/
public Observable<String> observeJson(int delay) {
return streamGenerator.call(delay).map(convertToJsonFunc);
}
Expand Down
Expand Up @@ -18,8 +18,8 @@
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.hystrix.metric.sample.HystrixUtilization;
import com.netflix.hystrix.metric.sample.HystrixUtilizationStream;
import rx.Observable;
import rx.functions.Func1;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -49,27 +49,17 @@ public class HystrixUtilizationSseServlet extends HystrixSampleSseServlet<Hystri

private static final long serialVersionUID = -7812908330777694972L;

private static final int DEFAULT_ONNEXT_DELAY_IN_MS = 100;

private final HystrixUtilizationJsonStream jsonStream;

/* used to track number of connections and throttle */
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections =
DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

public HystrixUtilizationSseServlet() {
this.jsonStream = new HystrixUtilizationJsonStream();

super(HystrixUtilizationStream.getInstance().observe());
}

/* package-private */ HystrixUtilizationSseServlet(Func1<Integer, Observable<HystrixUtilization>> createStream) {
this.jsonStream = new HystrixUtilizationJsonStream(createStream);
}

@Override
int getDefaultDelayInMilliseconds() {
return DEFAULT_ONNEXT_DELAY_IN_MS;
/* package-private */ HystrixUtilizationSseServlet(Observable<HystrixUtilization> sampleStream, int pausePollerThreadDelayInMs) {
super(sampleStream, pausePollerThreadDelayInMs);
}

@Override
Expand All @@ -92,11 +82,6 @@ protected void decrementCurrentConcurrentConnections() {
concurrentConnections.decrementAndGet();
}

@Override
protected Observable<HystrixUtilization> getStream(int delay) {
return jsonStream.observe(delay);
}

@Override
protected String convertToString(HystrixUtilization utilization) throws IOException {
return HystrixUtilizationJsonStream.convertToJson(utilization);
Expand Down
Expand Up @@ -16,7 +16,6 @@
package com.netflix.hystrix.contrib.sample.stream;

import com.netflix.hystrix.config.HystrixConfiguration;
import com.netflix.hystrix.config.HystrixConfigurationStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -58,15 +57,6 @@ public HystrixConfiguration call(Long timestamp) {
}
});

private Func1<Integer, Observable<HystrixConfiguration>> generateStream(final Observable<HystrixConfiguration> o) {
return new Func1<Integer, Observable<HystrixConfiguration>>() {
@Override
public Observable<HystrixConfiguration> call(Integer integer) {
return o;
}
};
}

private final Observable<HystrixConfiguration> streamOfOnNextThenOnError = Observable.create(new Observable.OnSubscribe<HystrixConfiguration>() {
@Override
public void call(Subscriber<? super HystrixConfiguration> subscriber) {
Expand Down Expand Up @@ -109,7 +99,7 @@ public void tearDown() {

@Test
public void shutdownServletShouldRejectRequests() throws ServletException, IOException {
servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts));
servlet = new HystrixConfigSseServlet(streamOfOnNexts, 10);
try {
servlet.init();
} catch (ServletException ex) {
Expand All @@ -126,7 +116,7 @@ public void shutdownServletShouldRejectRequests() throws ServletException, IOExc

@Test
public void testConfigDataWithInfiniteOnNextStream() throws IOException, InterruptedException {
servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts));
servlet = new HystrixConfigSseServlet(streamOfOnNexts, 10);
try {
servlet.init();
} catch (ServletException ex) {
Expand Down Expand Up @@ -182,13 +172,13 @@ public void run() {
Thread.sleep(100);

System.out.println("WRITES : " + writes.get());
assertEquals(9, writes.get());
assertTrue(writes.get() >= 9);
assertEquals(0, servlet.getNumberCurrentConnections());
}

@Test
public void testConfigDataWithStreamOnError() throws IOException, InterruptedException {
servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNextThenOnError));
servlet = new HystrixConfigSseServlet(streamOfOnNextThenOnError, 10);
try {
servlet.init();
} catch (ServletException ex) {
Expand Down Expand Up @@ -241,7 +231,7 @@ public void run() {

@Test
public void testConfigDataWithStreamOnCompleted() throws IOException, InterruptedException {
servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNextThenOnCompleted));
servlet = new HystrixConfigSseServlet(streamOfOnNextThenOnCompleted, 10);
try {
servlet.init();
} catch (ServletException ex) {
Expand Down Expand Up @@ -294,7 +284,7 @@ public void run() {

@Test
public void testConfigDataWithIoExceptionOnWrite() throws IOException, InterruptedException {
servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts));
servlet = new HystrixConfigSseServlet(streamOfOnNexts, 10);
try {
servlet.init();
} catch (ServletException ex) {
Expand All @@ -303,7 +293,6 @@ public void testConfigDataWithIoExceptionOnWrite() throws IOException, Interrupt

final AtomicInteger writes = new AtomicInteger(0);

when(mockReq.getParameter("delay")).thenReturn("100");
when(mockResp.getWriter()).thenReturn(mockPrintWriter);
Mockito.doAnswer(new Answer<Void>() {
@Override
Expand Down
Expand Up @@ -32,10 +32,6 @@
import java.util.function.Supplier;

class EventStream implements Supplier<Observable<Payload>> {

private final static int CONFIGURATION_DATA_INTERVAL_IN_MS = 500;
private final static int UTILIZATION_DATA_INTERVAL_IN_MS = 500;

private final Observable<Payload> source;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

Expand All @@ -57,7 +53,7 @@ public static EventStream getInstance(EventStreamEnum eventStreamEnum) {

switch (eventStreamEnum) {
case CONFIG_STREAM:
source = new HystrixConfigurationStream(CONFIGURATION_DATA_INTERVAL_IN_MS)
source = HystrixConfigurationStream.getInstance()
.observe()
.map(SerialHystrixConfiguration::toBytes)
.map(SerialHystrixMetric::toPayload);
Expand All @@ -69,7 +65,7 @@ public static EventStream getInstance(EventStreamEnum eventStreamEnum) {
.map(SerialHystrixMetric::toPayload);
break;
case UTILIZATION_STREAM:
source = new HystrixUtilizationStream(UTILIZATION_DATA_INTERVAL_IN_MS)
source = HystrixUtilizationStream.getInstance()
.observe()
.map(SerialHystrixUtilization::toBytes)
.map(SerialHystrixMetric::toPayload);
Expand Down

0 comments on commit 2747e91

Please sign in to comment.