From f6e1eb87b86e8f8d78f99170c18078f53dd6dfd4 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Wed, 18 May 2016 16:20:08 -0700 Subject: [PATCH 1/7] initial commit ReactiveSocket event streams --- .../reactivesocket/EventStreamEnum.java | 5 + .../EventStreamRequestHandler.java | 7 + .../reactivesocket/StreamingSupplier.java | 7 + .../HystrixCollasperMetricsStream.java | 107 ++++++++ .../metrics/HystrixCommandMetricsStream.java | 239 ++++++++++++++++++ .../HystrixThreadPoolMetricsStream.java | 94 +++++++ .../requests/HystrixRequestEventsStream.java | 94 +++++++ .../sample/HystrixConfigStream.java | 7 + .../sample/HystrixUtilizationStream.java | 172 +++++++++++++ 9 files changed, 732 insertions(+) create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStream.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStream.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixThreadPoolMetricsStream.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java new file mode 100644 index 000000000..747ffbe48 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java @@ -0,0 +1,5 @@ +package com.netflix.hystrix.contrib.reactivesocket; + + +public enum EventStreamEnum { +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java new file mode 100644 index 000000000..40df6f625 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java @@ -0,0 +1,7 @@ +package com.netflix.hystrix.contrib.reactivesocket; + +/** + * Created by rroeser on 5/17/16. + */ +public class EventStreamRequestHandler { +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java new file mode 100644 index 000000000..1fa93ebaf --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java @@ -0,0 +1,7 @@ +package com.netflix.hystrix.contrib.reactivesocket; + +/** + * Created by rroeser on 5/18/16. + */ +public class StreamingSupplier { +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStream.java new file mode 100644 index 000000000..8c6001bcc --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStream.java @@ -0,0 +1,107 @@ +package com.netflix.hystrix.contrib.reactivesocket.metrics; + + +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCollapserMetrics; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier; +import org.agrona.LangUtil; +import rx.functions.Func0; + +import java.io.ByteArrayOutputStream; +import java.util.stream.Stream; + +public class HystrixCollasperMetricsStream extends StreamingSupplier { + private static HystrixCollasperMetricsStream INSTANCE = new HystrixCollasperMetricsStream(); + + private HystrixCollasperMetricsStream() { + super(); + } + + public static HystrixCollasperMetricsStream getInstance() { + return INSTANCE; + } + + @Override + protected Stream getStream() { + return HystrixCollapserMetrics.getInstances().stream(); + } + + protected byte[] getPayloadData(final HystrixCollapserMetrics collapserMetrics) { + byte[] retVal = null; + try { + HystrixCollapserKey key = collapserMetrics.getCollapserKey(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = jsonFactory.createGenerator(bos); + json.writeStartObject(); + + json.writeStringField("type", "HystrixCollapser"); + json.writeStringField("name", key.name()); + json.writeNumberField("currentTime", System.currentTimeMillis()); + + safelyWriteNumberField(json, "rollingCountRequestsBatched", new Func0() { + @Override + public Long call() { + return collapserMetrics.getRollingCount(HystrixEventType.Collapser.ADDED_TO_BATCH); + } + }); + safelyWriteNumberField(json, "rollingCountBatches", new Func0() { + @Override + public Long call() { + return collapserMetrics.getRollingCount(HystrixEventType.Collapser.BATCH_EXECUTED); + } + }); + safelyWriteNumberField(json, "rollingCountResponsesFromCache", new Func0() { + @Override + public Long call() { + return collapserMetrics.getRollingCount(HystrixEventType.Collapser.RESPONSE_FROM_CACHE); + } + }); + + // batch size percentiles + json.writeNumberField("batchSize_mean", collapserMetrics.getBatchSizeMean()); + json.writeObjectFieldStart("batchSize"); + json.writeNumberField("25", collapserMetrics.getBatchSizePercentile(25)); + json.writeNumberField("50", collapserMetrics.getBatchSizePercentile(50)); + json.writeNumberField("75", collapserMetrics.getBatchSizePercentile(75)); + json.writeNumberField("90", collapserMetrics.getBatchSizePercentile(90)); + json.writeNumberField("95", collapserMetrics.getBatchSizePercentile(95)); + json.writeNumberField("99", collapserMetrics.getBatchSizePercentile(99)); + json.writeNumberField("99.5", collapserMetrics.getBatchSizePercentile(99.5)); + json.writeNumberField("100", collapserMetrics.getBatchSizePercentile(100)); + json.writeEndObject(); + + // shard size percentiles (commented-out for now) + //json.writeNumberField("shardSize_mean", collapserMetrics.getShardSizeMean()); + //json.writeObjectFieldStart("shardSize"); + //json.writeNumberField("25", collapserMetrics.getShardSizePercentile(25)); + //json.writeNumberField("50", collapserMetrics.getShardSizePercentile(50)); + //json.writeNumberField("75", collapserMetrics.getShardSizePercentile(75)); + //json.writeNumberField("90", collapserMetrics.getShardSizePercentile(90)); + //json.writeNumberField("95", collapserMetrics.getShardSizePercentile(95)); + //json.writeNumberField("99", collapserMetrics.getShardSizePercentile(99)); + //json.writeNumberField("99.5", collapserMetrics.getShardSizePercentile(99.5)); + //json.writeNumberField("100", collapserMetrics.getShardSizePercentile(100)); + //json.writeEndObject(); + + //json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", collapserMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); + json.writeBooleanField("propertyValue_requestCacheEnabled", collapserMetrics.getProperties().requestCacheEnabled().get()); + json.writeNumberField("propertyValue_maxRequestsInBatch", collapserMetrics.getProperties().maxRequestsInBatch().get()); + json.writeNumberField("propertyValue_timerDelayInMilliseconds", collapserMetrics.getProperties().timerDelayInMilliseconds().get()); + + json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster + + json.writeEndObject(); + json.close(); + + retVal = bos.toByteArray(); + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + +} +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStream.java new file mode 100644 index 000000000..f426452b1 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStream.java @@ -0,0 +1,239 @@ +package com.netflix.hystrix.contrib.reactivesocket.metrics; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixCircuitBreaker; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixCommandMetrics; +import com.netflix.hystrix.HystrixCommandProperties; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier; +import org.agrona.LangUtil; +import rx.functions.Func0; + +import java.io.ByteArrayOutputStream; +import java.util.stream.Stream; + +public class HystrixCommandMetricsStream extends StreamingSupplier { + private static final HystrixCommandMetricsStream INSTANCE = new HystrixCommandMetricsStream(); + + private HystrixCommandMetricsStream() { + super(); + } + + public static HystrixCommandMetricsStream getInstance() { + return INSTANCE; + } + + @Override + protected Stream getStream() { + return HystrixCommandMetrics.getInstances().stream(); + } + + protected byte[] getPayloadData(final HystrixCommandMetrics commandMetrics) { + byte[] retVal = null; + + try + + { + HystrixCommandKey key = commandMetrics.getCommandKey(); + HystrixCircuitBreaker circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(key); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + JsonGenerator json = jsonFactory.createGenerator(bos); + + json.writeStartObject(); + json.writeStringField("type", "HystrixCommand"); + json.writeStringField("name", key.name()); + json.writeStringField("group", commandMetrics.getCommandGroup().name()); + json.writeNumberField("currentTime", System.currentTimeMillis()); + + // circuit breaker + if (circuitBreaker == null) { + // circuit breaker is disabled and thus never open + json.writeBooleanField("isCircuitBreakerOpen", false); + } else { + json.writeBooleanField("isCircuitBreakerOpen", circuitBreaker.isOpen()); + } + HystrixCommandMetrics.HealthCounts healthCounts = commandMetrics.getHealthCounts(); + json.writeNumberField("errorPercentage", healthCounts.getErrorPercentage()); + json.writeNumberField("errorCount", healthCounts.getErrorCount()); + json.writeNumberField("requestCount", healthCounts.getTotalRequests()); + + // rolling counters + safelyWriteNumberField(json, "rollingCountBadRequests", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.BAD_REQUEST); + } + }); + safelyWriteNumberField(json, "rollingCountCollapsedRequests", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.COLLAPSED); + } + }); + safelyWriteNumberField(json, "rollingCountEmit", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.EMIT); + } + }); + safelyWriteNumberField(json, "rollingCountExceptionsThrown", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.EXCEPTION_THROWN); + } + }); + safelyWriteNumberField(json, "rollingCountFailure", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.FAILURE); + } + }); + safelyWriteNumberField(json, "rollingCountFallbackEmit", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_EMIT); + } + }); + safelyWriteNumberField(json, "rollingCountFallbackFailure", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_FAILURE); + } + }); + safelyWriteNumberField(json, "rollingCountFallbackMissing", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_MISSING); + } + }); + safelyWriteNumberField(json, "rollingCountFallbackRejection", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_REJECTION); + } + }); + safelyWriteNumberField(json, "rollingCountFallbackSuccess", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_SUCCESS); + } + }); + safelyWriteNumberField(json, "rollingCountResponsesFromCache", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.RESPONSE_FROM_CACHE); + } + }); + safelyWriteNumberField(json, "rollingCountSemaphoreRejected", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.SEMAPHORE_REJECTED); + } + }); + safelyWriteNumberField(json, "rollingCountShortCircuited", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.SHORT_CIRCUITED); + } + }); + safelyWriteNumberField(json, "rollingCountSuccess", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.SUCCESS); + } + }); + safelyWriteNumberField(json, "rollingCountThreadPoolRejected", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.THREAD_POOL_REJECTED); + } + }); + safelyWriteNumberField(json, "rollingCountTimeout", new Func0() { + @Override + public Long call() { + return commandMetrics.getRollingCount(HystrixEventType.TIMEOUT); + } + }); + + json.writeNumberField("currentConcurrentExecutionCount", commandMetrics.getCurrentConcurrentExecutionCount()); + json.writeNumberField("rollingMaxConcurrentExecutionCount", commandMetrics.getRollingMaxConcurrentExecutions()); + + // latency percentiles + json.writeNumberField("latencyExecute_mean", commandMetrics.getExecutionTimeMean()); + json.writeObjectFieldStart("latencyExecute"); + json.writeNumberField("0", commandMetrics.getExecutionTimePercentile(0)); + json.writeNumberField("25", commandMetrics.getExecutionTimePercentile(25)); + json.writeNumberField("50", commandMetrics.getExecutionTimePercentile(50)); + json.writeNumberField("75", commandMetrics.getExecutionTimePercentile(75)); + json.writeNumberField("90", commandMetrics.getExecutionTimePercentile(90)); + json.writeNumberField("95", commandMetrics.getExecutionTimePercentile(95)); + json.writeNumberField("99", commandMetrics.getExecutionTimePercentile(99)); + json.writeNumberField("99.5", commandMetrics.getExecutionTimePercentile(99.5)); + json.writeNumberField("100", commandMetrics.getExecutionTimePercentile(100)); + json.writeEndObject(); + // + json.writeNumberField("latencyTotal_mean", commandMetrics.getTotalTimeMean()); + json.writeObjectFieldStart("latencyTotal"); + json.writeNumberField("0", commandMetrics.getTotalTimePercentile(0)); + json.writeNumberField("25", commandMetrics.getTotalTimePercentile(25)); + json.writeNumberField("50", commandMetrics.getTotalTimePercentile(50)); + json.writeNumberField("75", commandMetrics.getTotalTimePercentile(75)); + json.writeNumberField("90", commandMetrics.getTotalTimePercentile(90)); + json.writeNumberField("95", commandMetrics.getTotalTimePercentile(95)); + json.writeNumberField("99", commandMetrics.getTotalTimePercentile(99)); + json.writeNumberField("99.5", commandMetrics.getTotalTimePercentile(99.5)); + json.writeNumberField("100", commandMetrics.getTotalTimePercentile(100)); + json.writeEndObject(); + + // property values for reporting what is actually seen by the command rather than what was set somewhere + HystrixCommandProperties commandProperties = commandMetrics.getProperties(); + + json.writeNumberField("propertyValue_circuitBreakerRequestVolumeThreshold", commandProperties.circuitBreakerRequestVolumeThreshold().get()); + json.writeNumberField("propertyValue_circuitBreakerSleepWindowInMilliseconds", commandProperties.circuitBreakerSleepWindowInMilliseconds().get()); + json.writeNumberField("propertyValue_circuitBreakerErrorThresholdPercentage", commandProperties.circuitBreakerErrorThresholdPercentage().get()); + json.writeBooleanField("propertyValue_circuitBreakerForceOpen", commandProperties.circuitBreakerForceOpen().get()); + json.writeBooleanField("propertyValue_circuitBreakerForceClosed", commandProperties.circuitBreakerForceClosed().get()); + json.writeBooleanField("propertyValue_circuitBreakerEnabled", commandProperties.circuitBreakerEnabled().get()); + + json.writeStringField("propertyValue_executionIsolationStrategy", commandProperties.executionIsolationStrategy().get().name()); + json.writeNumberField("propertyValue_executionIsolationThreadTimeoutInMilliseconds", commandProperties.executionTimeoutInMilliseconds().get()); + json.writeNumberField("propertyValue_executionTimeoutInMilliseconds", commandProperties.executionTimeoutInMilliseconds().get()); + json.writeBooleanField("propertyValue_executionIsolationThreadInterruptOnTimeout", commandProperties.executionIsolationThreadInterruptOnTimeout().get()); + json.writeStringField("propertyValue_executionIsolationThreadPoolKeyOverride", commandProperties.executionIsolationThreadPoolKeyOverride().get()); + json.writeNumberField("propertyValue_executionIsolationSemaphoreMaxConcurrentRequests", commandProperties.executionIsolationSemaphoreMaxConcurrentRequests().get()); + json.writeNumberField("propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests", commandProperties.fallbackIsolationSemaphoreMaxConcurrentRequests().get()); + + /* + * The following are commented out as these rarely change and are verbose for streaming for something people don't change. + * We could perhaps allow a property or request argument to include these. + */ + + // json.put("propertyValue_metricsRollingPercentileEnabled", commandProperties.metricsRollingPercentileEnabled().get()); + // json.put("propertyValue_metricsRollingPercentileBucketSize", commandProperties.metricsRollingPercentileBucketSize().get()); + // json.put("propertyValue_metricsRollingPercentileWindow", commandProperties.metricsRollingPercentileWindowInMilliseconds().get()); + // json.put("propertyValue_metricsRollingPercentileWindowBuckets", commandProperties.metricsRollingPercentileWindowBuckets().get()); + // json.put("propertyValue_metricsRollingStatisticalWindowBuckets", commandProperties.metricsRollingStatisticalWindowBuckets().get()); + json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", commandProperties.metricsRollingStatisticalWindowInMilliseconds().get()); + + json.writeBooleanField("propertyValue_requestCacheEnabled", commandProperties.requestCacheEnabled().get()); + json.writeBooleanField("propertyValue_requestLogEnabled", commandProperties.requestLogEnabled().get()); + + json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster + json.writeStringField("threadPool", commandMetrics.getThreadPoolKey().name()); + + json.writeEndObject(); + json.close(); + + retVal = bos.toByteArray(); + } catch (Exception t) { + LangUtil.rethrowUnchecked(t); + } + + return retVal; + } +} + + diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixThreadPoolMetricsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixThreadPoolMetricsStream.java new file mode 100644 index 000000000..c2407934d --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixThreadPoolMetricsStream.java @@ -0,0 +1,94 @@ +package com.netflix.hystrix.contrib.reactivesocket.metrics; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.HystrixThreadPoolMetrics; +import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier; +import io.reactivesocket.Payload; +import org.agrona.LangUtil; +import rx.Observable; +import rx.functions.Func0; + +import java.io.ByteArrayOutputStream; +import java.util.stream.Stream; + +public class HystrixThreadPoolMetricsStream extends StreamingSupplier { + private static HystrixThreadPoolMetricsStream INSTANCE = new HystrixThreadPoolMetricsStream(); + + private HystrixThreadPoolMetricsStream() { + super(); + } + + public static HystrixThreadPoolMetricsStream getInstance() { + return INSTANCE; + } + + @Override + public boolean filter(HystrixThreadPoolMetrics threadPoolMetrics) { + return threadPoolMetrics.getCurrentCompletedTaskCount().intValue() > 0; + } + + @Override + public Observable get() { + return super.get(); + } + + @Override + protected byte[] getPayloadData(HystrixThreadPoolMetrics threadPoolMetrics) { + byte[] retVal = null; + + try { + HystrixThreadPoolKey key = threadPoolMetrics.getThreadPoolKey(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = jsonFactory.createGenerator(bos); + json.writeStartObject(); + + json.writeStringField("type", "HystrixThreadPool"); + json.writeStringField("name", key.name()); + json.writeNumberField("currentTime", System.currentTimeMillis()); + + json.writeNumberField("currentActiveCount", threadPoolMetrics.getCurrentActiveCount().intValue()); + json.writeNumberField("currentCompletedTaskCount", threadPoolMetrics.getCurrentCompletedTaskCount().longValue()); + json.writeNumberField("currentCorePoolSize", threadPoolMetrics.getCurrentCorePoolSize().intValue()); + json.writeNumberField("currentLargestPoolSize", threadPoolMetrics.getCurrentLargestPoolSize().intValue()); + json.writeNumberField("currentMaximumPoolSize", threadPoolMetrics.getCurrentMaximumPoolSize().intValue()); + json.writeNumberField("currentPoolSize", threadPoolMetrics.getCurrentPoolSize().intValue()); + json.writeNumberField("currentQueueSize", threadPoolMetrics.getCurrentQueueSize().intValue()); + json.writeNumberField("currentTaskCount", threadPoolMetrics.getCurrentTaskCount().longValue()); + safelyWriteNumberField(json, "rollingCountThreadsExecuted", new Func0() { + @Override + public Long call() { + return threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.EXECUTED); + } + }); + json.writeNumberField("rollingMaxActiveThreads", threadPoolMetrics.getRollingMaxActiveThreads()); + safelyWriteNumberField(json, "rollingCountCommandRejections", new Func0() { + @Override + public Long call() { + return threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.REJECTED); + } + }); + + json.writeNumberField("propertyValue_queueSizeRejectionThreshold", threadPoolMetrics.getProperties().queueSizeRejectionThreshold().get()); + json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", threadPoolMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); + + json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster + + json.writeEndObject(); + json.close(); + + retVal = bos.toByteArray(); + + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + + @Override + protected Stream getStream() { + return HystrixThreadPoolMetrics.getInstances().stream(); + } +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java new file mode 100644 index 000000000..18c5f8ef0 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java @@ -0,0 +1,94 @@ +package com.netflix.hystrix.contrib.reactivesocket.requests; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.HystrixThreadPoolMetrics; +import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier; +import io.reactivesocket.Payload; +import org.agrona.LangUtil; +import rx.Observable; +import rx.functions.Func0; + +import java.io.ByteArrayOutputStream; +import java.util.stream.Stream; + +public class HystrixRequestEventsStream extends StreamingSupplier { + private static HystrixRequestEventsStream INSTANCE = new HystrixRequestEventsStream(); + + private HystrixRequestEventsStream() { + super(); + } + + public static HystrixRequestEventsStream getInstance() { + return INSTANCE; + } + + @Override + public boolean filter(HystrixThreadPoolMetrics threadPoolMetrics) { + return threadPoolMetrics.getCurrentCompletedTaskCount().intValue() > 0; + } + + @Override + public Observable get() { + return super.get(); + } + + @Override + protected byte[] getPayloadData(HystrixThreadPoolMetrics threadPoolMetrics) { + byte[] retVal = null; + + try { + HystrixThreadPoolKey key = threadPoolMetrics.getThreadPoolKey(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = jsonFactory.createGenerator(bos); + json.writeStartObject(); + + json.writeStringField("type", "HystrixThreadPool"); + json.writeStringField("name", key.name()); + json.writeNumberField("currentTime", System.currentTimeMillis()); + + json.writeNumberField("currentActiveCount", threadPoolMetrics.getCurrentActiveCount().intValue()); + json.writeNumberField("currentCompletedTaskCount", threadPoolMetrics.getCurrentCompletedTaskCount().longValue()); + json.writeNumberField("currentCorePoolSize", threadPoolMetrics.getCurrentCorePoolSize().intValue()); + json.writeNumberField("currentLargestPoolSize", threadPoolMetrics.getCurrentLargestPoolSize().intValue()); + json.writeNumberField("currentMaximumPoolSize", threadPoolMetrics.getCurrentMaximumPoolSize().intValue()); + json.writeNumberField("currentPoolSize", threadPoolMetrics.getCurrentPoolSize().intValue()); + json.writeNumberField("currentQueueSize", threadPoolMetrics.getCurrentQueueSize().intValue()); + json.writeNumberField("currentTaskCount", threadPoolMetrics.getCurrentTaskCount().longValue()); + safelyWriteNumberField(json, "rollingCountThreadsExecuted", new Func0() { + @Override + public Long call() { + return threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.EXECUTED); + } + }); + json.writeNumberField("rollingMaxActiveThreads", threadPoolMetrics.getRollingMaxActiveThreads()); + safelyWriteNumberField(json, "rollingCountCommandRejections", new Func0() { + @Override + public Long call() { + return threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.REJECTED); + } + }); + + json.writeNumberField("propertyValue_queueSizeRejectionThreshold", threadPoolMetrics.getProperties().queueSizeRejectionThreshold().get()); + json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", threadPoolMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); + + json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster + + json.writeEndObject(); + json.close(); + + retVal = bos.toByteArray(); + + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + + @Override + protected Stream getStream() { + return HystrixThreadPoolMetrics.getInstances().stream(); + } +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java new file mode 100644 index 000000000..834a96b65 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java @@ -0,0 +1,7 @@ +package com.netflix.hystrix.contrib.reactivesocket.sample; + +/** + * Created by rroeser on 5/18/16. + */ +public class HystrixConfigStream { +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java new file mode 100644 index 000000000..762b2394f --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java @@ -0,0 +1,172 @@ +package com.netflix.hystrix.contrib.reactivesocket.sample; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.config.HystrixCollapserConfiguration; +import com.netflix.hystrix.config.HystrixCommandConfiguration; +import com.netflix.hystrix.config.HystrixConfiguration; +import com.netflix.hystrix.config.HystrixConfigurationStream; +import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; +import org.agrona.LangUtil; +import rx.Observable; +import rx.subjects.BehaviorSubject; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.function.Supplier; + +public class HystrixUtilizationStream implements Supplier> { + private static final HystrixUtilizationStream INSTANCE = new HystrixUtilizationStream(); + + private BehaviorSubject subject; + + private JsonFactory jsonFactory; + + private HystrixUtilizationStream() { + this.subject = BehaviorSubject.create(); + this.jsonFactory = new JsonFactory(); + + HystrixConfigurationStream stream = new HystrixConfigurationStream(500); + stream + .observe() + .map(this::getPayloadData) + .map(b -> new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.wrap(b); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }) + .subscribe(subject); + } + + public static HystrixUtilizationStream getInstance() { + return INSTANCE; + } + + @Override + public Observable get() { + return subject; + } + + public byte[] getPayloadData(HystrixConfiguration config) { + byte[] retVal = null; + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = jsonFactory.createGenerator(bos); + + json.writeStartObject(); + json.writeStringField("type", "HystrixConfig"); + json.writeObjectFieldStart("commands"); + for (Map.Entry entry: config.getCommandConfig().entrySet()) { + final HystrixCommandKey key = entry.getKey(); + final HystrixCommandConfiguration commandConfig = entry.getValue(); + writeCommandConfigJson(json, key, commandConfig); + + } + json.writeEndObject(); + + json.writeObjectFieldStart("threadpools"); + for (Map.Entry entry: config.getThreadPoolConfig().entrySet()) { + final HystrixThreadPoolKey threadPoolKey = entry.getKey(); + final HystrixThreadPoolConfiguration threadPoolConfig = entry.getValue(); + writeThreadPoolConfigJson(json, threadPoolKey, threadPoolConfig); + } + json.writeEndObject(); + + json.writeObjectFieldStart("collapsers"); + for (Map.Entry entry: config.getCollapserConfig().entrySet()) { + final HystrixCollapserKey collapserKey = entry.getKey(); + final HystrixCollapserConfiguration collapserConfig = entry.getValue(); + writeCollapserConfigJson(json, collapserKey, collapserConfig); + } + json.writeEndObject(); + json.writeEndObject(); + json.close(); + + + retVal = bos.toByteArray(); + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + + private static void writeCommandConfigJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandConfiguration commandConfig) throws IOException { + json.writeObjectFieldStart(key.name()); + json.writeStringField("threadPoolKey", commandConfig.getThreadPoolKey().name()); + json.writeStringField("groupKey", commandConfig.getGroupKey().name()); + json.writeObjectFieldStart("execution"); + HystrixCommandConfiguration.HystrixCommandExecutionConfig executionConfig = commandConfig.getExecutionConfig(); + json.writeStringField("isolationStrategy", executionConfig.getIsolationStrategy().name()); + json.writeStringField("threadPoolKeyOverride", executionConfig.getThreadPoolKeyOverride()); + json.writeBooleanField("requestCacheEnabled", executionConfig.isRequestCacheEnabled()); + json.writeBooleanField("requestLogEnabled", executionConfig.isRequestLogEnabled()); + json.writeBooleanField("timeoutEnabled", executionConfig.isTimeoutEnabled()); + json.writeBooleanField("fallbackEnabled", executionConfig.isFallbackEnabled()); + json.writeNumberField("timeoutInMilliseconds", executionConfig.getTimeoutInMilliseconds()); + json.writeNumberField("semaphoreSize", executionConfig.getSemaphoreMaxConcurrentRequests()); + json.writeNumberField("fallbackSemaphoreSize", executionConfig.getFallbackMaxConcurrentRequest()); + json.writeBooleanField("threadInterruptOnTimeout", executionConfig.isThreadInterruptOnTimeout()); + json.writeEndObject(); + json.writeObjectFieldStart("metrics"); + HystrixCommandConfiguration.HystrixCommandMetricsConfig metricsConfig = commandConfig.getMetricsConfig(); + json.writeNumberField("healthBucketSizeInMs", metricsConfig.getHealthIntervalInMilliseconds()); + json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); + json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); + json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + json.writeObjectFieldStart("circuitBreaker"); + HystrixCommandConfiguration.HystrixCommandCircuitBreakerConfig circuitBreakerConfig = commandConfig.getCircuitBreakerConfig(); + json.writeBooleanField("enabled", circuitBreakerConfig.isEnabled()); + json.writeBooleanField("isForcedOpen", circuitBreakerConfig.isForceOpen()); + json.writeBooleanField("isForcedClosed", circuitBreakerConfig.isForceOpen()); + json.writeNumberField("requestVolumeThreshold", circuitBreakerConfig.getRequestVolumeThreshold()); + json.writeNumberField("errorPercentageThreshold", circuitBreakerConfig.getErrorThresholdPercentage()); + json.writeNumberField("sleepInMilliseconds", circuitBreakerConfig.getSleepWindowInMilliseconds()); + json.writeEndObject(); + json.writeEndObject(); + } + + private static void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolConfiguration threadPoolConfig) throws IOException { + json.writeObjectFieldStart(threadPoolKey.name()); + json.writeNumberField("coreSize", threadPoolConfig.getCoreSize()); + json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize()); + json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold()); + json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes()); + json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + } + + private static void writeCollapserConfigJson(JsonGenerator json, HystrixCollapserKey collapserKey, HystrixCollapserConfiguration collapserConfig) throws IOException { + json.writeObjectFieldStart(collapserKey.name()); + json.writeNumberField("maxRequestsInBatch", collapserConfig.getMaxRequestsInBatch()); + json.writeNumberField("timerDelayInMilliseconds", collapserConfig.getTimerDelayInMilliseconds()); + json.writeBooleanField("requestCacheEnabled", collapserConfig.isRequestCacheEnabled()); + json.writeObjectFieldStart("metrics"); + HystrixCollapserConfiguration.CollapserMetricsConfig metricsConfig = collapserConfig.getCollapserMetricsConfig(); + json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); + json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); + json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + json.writeEndObject(); + } +} From 3bf3126ba4f715ae7183eb93b7a1199ea638b699 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Wed, 18 May 2016 16:24:38 -0700 Subject: [PATCH 2/7] updated gradle --- .../build.gradle | 22 +++ .../reactivesocket/EventStreamEnum.java | 59 +++++- .../EventStreamRequestHandler.java | 60 +++++- .../reactivesocket/StreamingSupplier.java | 78 +++++++- .../HystrixCollasperMetricsStream.java | 1 - .../requests/HystrixRequestEventsStream.java | 139 ++++++++------ .../sample/HystrixConfigStream.java | 173 +++++++++++++++++- .../sample/HystrixUtilizationStream.java | 101 ++-------- settings.gradle | 2 + 9 files changed, 488 insertions(+), 147 deletions(-) create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle new file mode 100644 index 000000000..b8baf507c --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle @@ -0,0 +1,22 @@ +repositories { + mavenCentral() + jcenter() + maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' } +} + + +dependencies { + compile project(':hystrix-core') + + compile 'io.reactivex:rxjava-reactive-streams:latest.release' + + compile 'com.fasterxml.jackson.core:jackson-core:latest.release' + compile 'com.fasterxml.jackson.core:jackson-databind:latest.release' + compile 'com.fasterxml.jackson.core:jackson-annotations:latest.release' + compile 'com.fasterxml.jackson.module:jackson-module-afterburner:latest.release' + compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:latest.release' + compile 'io.reactivesocket:reactivesocket:latest.release' + + testCompile 'junit:junit-dep:4.10' + testCompile 'org.mockito:mockito-all:1.9.5' +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java index 747ffbe48..8658fd3ad 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java @@ -1,5 +1,62 @@ package com.netflix.hystrix.contrib.reactivesocket; -public enum EventStreamEnum { +import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCollasperMetricsStream; +import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCommandMetricsStream; +import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixThreadPoolMetricsStream; +import com.netflix.hystrix.contrib.reactivesocket.requests.HystrixRequestEventsStream; +import com.netflix.hystrix.contrib.reactivesocket.sample.HystrixConfigStream; +import com.netflix.hystrix.contrib.reactivesocket.sample.HystrixUtilizationStream; +import io.reactivesocket.Payload; +import rx.Observable; + +import java.util.Arrays; +import java.util.function.Supplier; + +public enum EventStreamEnum implements Supplier> { + + CONFIG_STREAM(1) { + @Override + public Observable get() { + return HystrixConfigStream.getInstance().get(); + } + }, + REQUEST_EVENT_STREAM(2) { + @Override + public Observable get() { + return HystrixRequestEventsStream.getInstance().get(); + } + }, + UTILIZATION_EVENT_STREAM(3) { + @Override + public Observable get() { + return HystrixUtilizationStream.getInstance().get(); + } + }, + METRICS_STREAM(4) { + @Override + public Observable get() { + return Observable.merge( + HystrixCommandMetricsStream.getInstance().get(), + HystrixThreadPoolMetricsStream.getInstance().get(), + HystrixCollasperMetricsStream.getInstance().get()); + } + } + + ; + + private int typeId; + + EventStreamEnum(int typeId) { + this.typeId = typeId; + } + + public static EventStreamEnum findByTypeId(int typeId) { + return Arrays + .asList(EventStreamEnum.findByTypeId(typeId)) + .stream() + .filter(t -> t.typeId == typeId) + .findAny() + .orElseThrow(() -> new IllegalStateException("no type id found for id => " + typeId)); + } } diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java index 40df6f625..0e78ee1de 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java @@ -1,7 +1,63 @@ package com.netflix.hystrix.contrib.reactivesocket; +import io.reactivesocket.Payload; +import io.reactivesocket.RequestHandler; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.RxReactiveStreams; + /** - * Created by rroeser on 5/17/16. + * An implementation of {@link RequestHandler} that provides a Hystrix Stream. Takes an integer which corresponds to + * an id in {@link EventStreamEnum}. If the id is found it will be again stream the events to the subscriber. */ -public class EventStreamRequestHandler { +public class EventStreamRequestHandler extends RequestHandler { + private static final Logger logger = LoggerFactory.getLogger(EventStreamRequestHandler.class); + + @Override + public Publisher handleRequestResponse(Payload payload) { + return NO_REQUEST_RESPONSE_HANDLER.apply(payload); + } + + @Override + public Publisher handleRequestStream(Payload payload) { + return NO_REQUEST_STREAM_HANDLER.apply(payload); + } + + @Override + public Publisher handleSubscription(Payload payload) { + Observable defer = Observable + .defer(() -> { + try { + int typeId = payload + .getData() + .getInt(0); + + EventStreamEnum eventStreamEnum = EventStreamEnum.findByTypeId(typeId); + return eventStreamEnum + .get(); + } catch (Throwable t) { + logger.error(t.getMessage(), t); + return Observable.error(t); + } + }); + + return RxReactiveStreams.toPublisher(defer); + } + + @Override + public Publisher handleFireAndForget(Payload payload) { + return NO_FIRE_AND_FORGET_HANDLER.apply(payload); + } + + @Override + public Publisher handleChannel(Payload initialPayload, Publisher inputs) { + return NO_REQUEST_CHANNEL_HANDLER.apply(inputs); + } + + @Override + public Publisher handleMetadataPush(Payload payload) { + return NO_METADATA_PUSH_HANDLER.apply(payload); + } } diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java index 1fa93ebaf..904bd19bf 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java @@ -1,7 +1,77 @@ package com.netflix.hystrix.contrib.reactivesocket; -/** - * Created by rroeser on 5/18/16. - */ -public class StreamingSupplier { +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.functions.Func0; +import rx.schedulers.Schedulers; +import rx.subjects.BehaviorSubject; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Stream; + +public abstract class StreamingSupplier implements Supplier> { + + protected Logger logger = LoggerFactory.getLogger(StreamingSupplier.class); + + protected BehaviorSubject subject; + + protected final JsonFactory jsonFactory; + + protected StreamingSupplier() { + subject = BehaviorSubject.create(); + jsonFactory = new JsonFactory(); + + Observable + .interval(500, TimeUnit.MILLISECONDS, Schedulers.computation()) + .doOnNext(i -> + getStream() + .filter(this::filter) + .map(this::getPayloadData) + .forEach(b -> { + Payload p = new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.wrap(b); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }; + + subject.onNext(p); + }) + ); + } + + public boolean filter(T t) { + return true; + } + + @Override + public Observable get() { + return subject; + } + + protected abstract Stream getStream(); + + protected abstract byte[] getPayloadData(T t); + + protected void safelyWriteNumberField(JsonGenerator json, String name, Func0 metricGenerator) throws IOException { + try { + json.writeNumberField(name, metricGenerator.call()); + } catch (NoSuchFieldError error) { + logger.error("While publishing Hystrix metrics stream, error looking up eventType for : " + name + ". Please check that all Hystrix versions are the same!"); + json.writeNumberField(name, 0L); + } + } } diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStream.java index 8c6001bcc..7095c90d5 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStream.java @@ -104,4 +104,3 @@ public Long call() { } } -} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java index 18c5f8ef0..9d47f45de 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java @@ -1,85 +1,73 @@ package com.netflix.hystrix.contrib.reactivesocket.requests; +import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.ExecutionResult; import com.netflix.hystrix.HystrixEventType; -import com.netflix.hystrix.HystrixThreadPoolKey; -import com.netflix.hystrix.HystrixThreadPoolMetrics; -import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier; +import com.netflix.hystrix.metric.HystrixRequestEvents; +import io.reactivesocket.Frame; import io.reactivesocket.Payload; import org.agrona.LangUtil; import rx.Observable; -import rx.functions.Func0; +import rx.schedulers.Schedulers; +import rx.subjects.BehaviorSubject; import java.io.ByteArrayOutputStream; -import java.util.stream.Stream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; -public class HystrixRequestEventsStream extends StreamingSupplier { +public class HystrixRequestEventsStream implements Supplier> { private static HystrixRequestEventsStream INSTANCE = new HystrixRequestEventsStream(); + private BehaviorSubject subject; + + private JsonFactory jsonFactory; + private HystrixRequestEventsStream() { - super(); + this.jsonFactory = new JsonFactory(); + this.subject = BehaviorSubject.create(); + + com.netflix.hystrix.metric.HystrixRequestEventsStream.getInstance() + .observe() + .observeOn(Schedulers.computation()) + .map(this::getPayloadData) + .map(b -> + new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.wrap(b); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }) + .subscribe(subject); } public static HystrixRequestEventsStream getInstance() { return INSTANCE; } - @Override - public boolean filter(HystrixThreadPoolMetrics threadPoolMetrics) { - return threadPoolMetrics.getCurrentCompletedTaskCount().intValue() > 0; - } - @Override public Observable get() { - return super.get(); + return subject; } - @Override - protected byte[] getPayloadData(HystrixThreadPoolMetrics threadPoolMetrics) { + public byte[] getPayloadData(HystrixRequestEvents requestEvents) { byte[] retVal = null; try { - HystrixThreadPoolKey key = threadPoolMetrics.getThreadPoolKey(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); JsonGenerator json = jsonFactory.createGenerator(bos); - json.writeStartObject(); - - json.writeStringField("type", "HystrixThreadPool"); - json.writeStringField("name", key.name()); - json.writeNumberField("currentTime", System.currentTimeMillis()); - - json.writeNumberField("currentActiveCount", threadPoolMetrics.getCurrentActiveCount().intValue()); - json.writeNumberField("currentCompletedTaskCount", threadPoolMetrics.getCurrentCompletedTaskCount().longValue()); - json.writeNumberField("currentCorePoolSize", threadPoolMetrics.getCurrentCorePoolSize().intValue()); - json.writeNumberField("currentLargestPoolSize", threadPoolMetrics.getCurrentLargestPoolSize().intValue()); - json.writeNumberField("currentMaximumPoolSize", threadPoolMetrics.getCurrentMaximumPoolSize().intValue()); - json.writeNumberField("currentPoolSize", threadPoolMetrics.getCurrentPoolSize().intValue()); - json.writeNumberField("currentQueueSize", threadPoolMetrics.getCurrentQueueSize().intValue()); - json.writeNumberField("currentTaskCount", threadPoolMetrics.getCurrentTaskCount().longValue()); - safelyWriteNumberField(json, "rollingCountThreadsExecuted", new Func0() { - @Override - public Long call() { - return threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.EXECUTED); - } - }); - json.writeNumberField("rollingMaxActiveThreads", threadPoolMetrics.getRollingMaxActiveThreads()); - safelyWriteNumberField(json, "rollingCountCommandRejections", new Func0() { - @Override - public Long call() { - return threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.REJECTED); - } - }); - - json.writeNumberField("propertyValue_queueSizeRejectionThreshold", threadPoolMetrics.getProperties().queueSizeRejectionThreshold().get()); - json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", threadPoolMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); - - json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster - - json.writeEndObject(); + writeRequestAsJson(json, requestEvents); json.close(); retVal = bos.toByteArray(); - } catch (Exception e) { LangUtil.rethrowUnchecked(e); } @@ -87,8 +75,51 @@ public Long call() { return retVal; } - @Override - protected Stream getStream() { - return HystrixThreadPoolMetrics.getInstances().stream(); + private void writeRequestAsJson(JsonGenerator json, HystrixRequestEvents request) throws IOException { + json.writeStartArray(); + + for (Map.Entry> entry: request.getExecutionsMappedToLatencies().entrySet()) { + convertExecutionToJson(json, entry.getKey(), entry.getValue()); + } + + json.writeEndArray(); + } + + private void convertExecutionToJson(JsonGenerator json, HystrixRequestEvents.ExecutionSignature executionSignature, List latencies) throws IOException { + json.writeStartObject(); + json.writeStringField("name", executionSignature.getCommandName()); + json.writeArrayFieldStart("events"); + ExecutionResult.EventCounts eventCounts = executionSignature.getEventCounts(); + for (HystrixEventType eventType: HystrixEventType.values()) { + if (!eventType.equals(HystrixEventType.COLLAPSED)) { + if (eventCounts.contains(eventType)) { + int eventCount = eventCounts.getCount(eventType); + if (eventCount > 1) { + json.writeStartObject(); + json.writeStringField("name", eventType.name()); + json.writeNumberField("count", eventCount); + json.writeEndObject(); + } else { + json.writeString(eventType.name()); + } + } + } + } + json.writeEndArray(); + json.writeArrayFieldStart("latencies"); + for (int latency: latencies) { + json.writeNumber(latency); + } + json.writeEndArray(); + if (executionSignature.getCachedCount() > 0) { + json.writeNumberField("cached", executionSignature.getCachedCount()); + } + if (executionSignature.getEventCounts().contains(HystrixEventType.COLLAPSED)) { + json.writeObjectFieldStart("collapsed"); + json.writeStringField("name", executionSignature.getCollapserKey().name()); + json.writeNumberField("count", executionSignature.getCollapserBatchSize()); + json.writeEndObject(); + } + json.writeEndObject(); } } \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java index 834a96b65..20d1c70e5 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java @@ -1,7 +1,172 @@ package com.netflix.hystrix.contrib.reactivesocket.sample; -/** - * Created by rroeser on 5/18/16. - */ -public class HystrixConfigStream { +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.config.HystrixCollapserConfiguration; +import com.netflix.hystrix.config.HystrixCommandConfiguration; +import com.netflix.hystrix.config.HystrixConfiguration; +import com.netflix.hystrix.config.HystrixConfigurationStream; +import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; +import org.agrona.LangUtil; +import rx.Observable; +import rx.subjects.BehaviorSubject; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.function.Supplier; + +public class HystrixConfigStream implements Supplier> { + private static final HystrixConfigStream INSTANCE = new HystrixConfigStream(); + + private BehaviorSubject subject; + + private JsonFactory jsonFactory; + + private HystrixConfigStream() { + this.subject = BehaviorSubject.create(); + this.jsonFactory = new JsonFactory(); + + HystrixConfigurationStream stream = new HystrixConfigurationStream(100); + stream + .observe() + .map(this::getPayloadData) + .map(b -> new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.wrap(b); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }) + .subscribe(subject); + } + + public static HystrixConfigStream getInstance() { + return INSTANCE; + } + + @Override + public Observable get() { + return subject; + } + + public byte[] getPayloadData(HystrixConfiguration config) { + byte[] retVal = null; + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = jsonFactory.createGenerator(bos); + + json.writeStartObject(); + json.writeStringField("type", "HystrixConfig"); + json.writeObjectFieldStart("commands"); + for (Map.Entry entry: config.getCommandConfig().entrySet()) { + final HystrixCommandKey key = entry.getKey(); + final HystrixCommandConfiguration commandConfig = entry.getValue(); + writeCommandConfigJson(json, key, commandConfig); + + } + json.writeEndObject(); + + json.writeObjectFieldStart("threadpools"); + for (Map.Entry entry: config.getThreadPoolConfig().entrySet()) { + final HystrixThreadPoolKey threadPoolKey = entry.getKey(); + final HystrixThreadPoolConfiguration threadPoolConfig = entry.getValue(); + writeThreadPoolConfigJson(json, threadPoolKey, threadPoolConfig); + } + json.writeEndObject(); + + json.writeObjectFieldStart("collapsers"); + for (Map.Entry entry: config.getCollapserConfig().entrySet()) { + final HystrixCollapserKey collapserKey = entry.getKey(); + final HystrixCollapserConfiguration collapserConfig = entry.getValue(); + writeCollapserConfigJson(json, collapserKey, collapserConfig); + } + json.writeEndObject(); + json.writeEndObject(); + json.close(); + + + retVal = bos.toByteArray(); + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + + private static void writeCommandConfigJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandConfiguration commandConfig) throws IOException { + json.writeObjectFieldStart(key.name()); + json.writeStringField("threadPoolKey", commandConfig.getThreadPoolKey().name()); + json.writeStringField("groupKey", commandConfig.getGroupKey().name()); + json.writeObjectFieldStart("execution"); + HystrixCommandConfiguration.HystrixCommandExecutionConfig executionConfig = commandConfig.getExecutionConfig(); + json.writeStringField("isolationStrategy", executionConfig.getIsolationStrategy().name()); + json.writeStringField("threadPoolKeyOverride", executionConfig.getThreadPoolKeyOverride()); + json.writeBooleanField("requestCacheEnabled", executionConfig.isRequestCacheEnabled()); + json.writeBooleanField("requestLogEnabled", executionConfig.isRequestLogEnabled()); + json.writeBooleanField("timeoutEnabled", executionConfig.isTimeoutEnabled()); + json.writeBooleanField("fallbackEnabled", executionConfig.isFallbackEnabled()); + json.writeNumberField("timeoutInMilliseconds", executionConfig.getTimeoutInMilliseconds()); + json.writeNumberField("semaphoreSize", executionConfig.getSemaphoreMaxConcurrentRequests()); + json.writeNumberField("fallbackSemaphoreSize", executionConfig.getFallbackMaxConcurrentRequest()); + json.writeBooleanField("threadInterruptOnTimeout", executionConfig.isThreadInterruptOnTimeout()); + json.writeEndObject(); + json.writeObjectFieldStart("metrics"); + HystrixCommandConfiguration.HystrixCommandMetricsConfig metricsConfig = commandConfig.getMetricsConfig(); + json.writeNumberField("healthBucketSizeInMs", metricsConfig.getHealthIntervalInMilliseconds()); + json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); + json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); + json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + json.writeObjectFieldStart("circuitBreaker"); + HystrixCommandConfiguration.HystrixCommandCircuitBreakerConfig circuitBreakerConfig = commandConfig.getCircuitBreakerConfig(); + json.writeBooleanField("enabled", circuitBreakerConfig.isEnabled()); + json.writeBooleanField("isForcedOpen", circuitBreakerConfig.isForceOpen()); + json.writeBooleanField("isForcedClosed", circuitBreakerConfig.isForceOpen()); + json.writeNumberField("requestVolumeThreshold", circuitBreakerConfig.getRequestVolumeThreshold()); + json.writeNumberField("errorPercentageThreshold", circuitBreakerConfig.getErrorThresholdPercentage()); + json.writeNumberField("sleepInMilliseconds", circuitBreakerConfig.getSleepWindowInMilliseconds()); + json.writeEndObject(); + json.writeEndObject(); + } + + private static void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolConfiguration threadPoolConfig) throws IOException { + json.writeObjectFieldStart(threadPoolKey.name()); + json.writeNumberField("coreSize", threadPoolConfig.getCoreSize()); + json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize()); + json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold()); + json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes()); + json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + } + + private static void writeCollapserConfigJson(JsonGenerator json, HystrixCollapserKey collapserKey, HystrixCollapserConfiguration collapserConfig) throws IOException { + json.writeObjectFieldStart(collapserKey.name()); + json.writeNumberField("maxRequestsInBatch", collapserConfig.getMaxRequestsInBatch()); + json.writeNumberField("timerDelayInMilliseconds", collapserConfig.getTimerDelayInMilliseconds()); + json.writeBooleanField("requestCacheEnabled", collapserConfig.isRequestCacheEnabled()); + json.writeObjectFieldStart("metrics"); + HystrixCollapserConfiguration.CollapserMetricsConfig metricsConfig = collapserConfig.getCollapserMetricsConfig(); + json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); + json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); + json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + json.writeEndObject(); + } } diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java index 762b2394f..745ca1a7d 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java @@ -2,14 +2,11 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; -import com.netflix.hystrix.HystrixCollapserKey; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixThreadPoolKey; -import com.netflix.hystrix.config.HystrixCollapserConfiguration; -import com.netflix.hystrix.config.HystrixCommandConfiguration; -import com.netflix.hystrix.config.HystrixConfiguration; -import com.netflix.hystrix.config.HystrixConfigurationStream; -import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; +import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; +import com.netflix.hystrix.metric.sample.HystrixThreadPoolUtilization; +import com.netflix.hystrix.metric.sample.HystrixUtilization; import io.reactivesocket.Frame; import io.reactivesocket.Payload; import org.agrona.LangUtil; @@ -33,7 +30,8 @@ private HystrixUtilizationStream() { this.subject = BehaviorSubject.create(); this.jsonFactory = new JsonFactory(); - HystrixConfigurationStream stream = new HystrixConfigurationStream(500); + com.netflix.hystrix.metric.sample.HystrixUtilizationStream stream + = new com.netflix.hystrix.metric.sample.HystrixUtilizationStream(100); stream .observe() .map(this::getPayloadData) @@ -60,7 +58,7 @@ public Observable get() { return subject; } - public byte[] getPayloadData(HystrixConfiguration config) { + public byte[] getPayloadData(HystrixUtilization utilization) { byte[] retVal = null; try { @@ -68,35 +66,26 @@ public byte[] getPayloadData(HystrixConfiguration config) { JsonGenerator json = jsonFactory.createGenerator(bos); json.writeStartObject(); - json.writeStringField("type", "HystrixConfig"); + json.writeStringField("type", "HystrixUtilization"); json.writeObjectFieldStart("commands"); - for (Map.Entry entry: config.getCommandConfig().entrySet()) { + for (Map.Entry entry: utilization.getCommandUtilizationMap().entrySet()) { final HystrixCommandKey key = entry.getKey(); - final HystrixCommandConfiguration commandConfig = entry.getValue(); - writeCommandConfigJson(json, key, commandConfig); + final HystrixCommandUtilization commandUtilization = entry.getValue(); + writeCommandUtilizationJson(json, key, commandUtilization); } json.writeEndObject(); json.writeObjectFieldStart("threadpools"); - for (Map.Entry entry: config.getThreadPoolConfig().entrySet()) { + for (Map.Entry entry: utilization.getThreadPoolUtilizationMap().entrySet()) { final HystrixThreadPoolKey threadPoolKey = entry.getKey(); - final HystrixThreadPoolConfiguration threadPoolConfig = entry.getValue(); - writeThreadPoolConfigJson(json, threadPoolKey, threadPoolConfig); - } - json.writeEndObject(); - - json.writeObjectFieldStart("collapsers"); - for (Map.Entry entry: config.getCollapserConfig().entrySet()) { - final HystrixCollapserKey collapserKey = entry.getKey(); - final HystrixCollapserConfiguration collapserConfig = entry.getValue(); - writeCollapserConfigJson(json, collapserKey, collapserConfig); + final HystrixThreadPoolUtilization threadPoolUtilization = entry.getValue(); + writeThreadPoolUtilizationJson(json, threadPoolKey, threadPoolUtilization); } json.writeEndObject(); json.writeEndObject(); json.close(); - retVal = bos.toByteArray(); } catch (Exception e) { LangUtil.rethrowUnchecked(e); @@ -105,68 +94,18 @@ public byte[] getPayloadData(HystrixConfiguration config) { return retVal; } - private static void writeCommandConfigJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandConfiguration commandConfig) throws IOException { + private static void writeCommandUtilizationJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandUtilization utilization) throws IOException { json.writeObjectFieldStart(key.name()); - json.writeStringField("threadPoolKey", commandConfig.getThreadPoolKey().name()); - json.writeStringField("groupKey", commandConfig.getGroupKey().name()); - json.writeObjectFieldStart("execution"); - HystrixCommandConfiguration.HystrixCommandExecutionConfig executionConfig = commandConfig.getExecutionConfig(); - json.writeStringField("isolationStrategy", executionConfig.getIsolationStrategy().name()); - json.writeStringField("threadPoolKeyOverride", executionConfig.getThreadPoolKeyOverride()); - json.writeBooleanField("requestCacheEnabled", executionConfig.isRequestCacheEnabled()); - json.writeBooleanField("requestLogEnabled", executionConfig.isRequestLogEnabled()); - json.writeBooleanField("timeoutEnabled", executionConfig.isTimeoutEnabled()); - json.writeBooleanField("fallbackEnabled", executionConfig.isFallbackEnabled()); - json.writeNumberField("timeoutInMilliseconds", executionConfig.getTimeoutInMilliseconds()); - json.writeNumberField("semaphoreSize", executionConfig.getSemaphoreMaxConcurrentRequests()); - json.writeNumberField("fallbackSemaphoreSize", executionConfig.getFallbackMaxConcurrentRequest()); - json.writeBooleanField("threadInterruptOnTimeout", executionConfig.isThreadInterruptOnTimeout()); - json.writeEndObject(); - json.writeObjectFieldStart("metrics"); - HystrixCommandConfiguration.HystrixCommandMetricsConfig metricsConfig = commandConfig.getMetricsConfig(); - json.writeNumberField("healthBucketSizeInMs", metricsConfig.getHealthIntervalInMilliseconds()); - json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); - json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); - json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); - json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); - json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); - json.writeEndObject(); - json.writeObjectFieldStart("circuitBreaker"); - HystrixCommandConfiguration.HystrixCommandCircuitBreakerConfig circuitBreakerConfig = commandConfig.getCircuitBreakerConfig(); - json.writeBooleanField("enabled", circuitBreakerConfig.isEnabled()); - json.writeBooleanField("isForcedOpen", circuitBreakerConfig.isForceOpen()); - json.writeBooleanField("isForcedClosed", circuitBreakerConfig.isForceOpen()); - json.writeNumberField("requestVolumeThreshold", circuitBreakerConfig.getRequestVolumeThreshold()); - json.writeNumberField("errorPercentageThreshold", circuitBreakerConfig.getErrorThresholdPercentage()); - json.writeNumberField("sleepInMilliseconds", circuitBreakerConfig.getSleepWindowInMilliseconds()); - json.writeEndObject(); + json.writeNumberField("activeCount", utilization.getConcurrentCommandCount()); json.writeEndObject(); } - private static void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolConfiguration threadPoolConfig) throws IOException { + private static void writeThreadPoolUtilizationJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolUtilization utilization) throws IOException { json.writeObjectFieldStart(threadPoolKey.name()); - json.writeNumberField("coreSize", threadPoolConfig.getCoreSize()); - json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize()); - json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold()); - json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes()); - json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds()); - json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets()); - json.writeEndObject(); - } - - private static void writeCollapserConfigJson(JsonGenerator json, HystrixCollapserKey collapserKey, HystrixCollapserConfiguration collapserConfig) throws IOException { - json.writeObjectFieldStart(collapserKey.name()); - json.writeNumberField("maxRequestsInBatch", collapserConfig.getMaxRequestsInBatch()); - json.writeNumberField("timerDelayInMilliseconds", collapserConfig.getTimerDelayInMilliseconds()); - json.writeBooleanField("requestCacheEnabled", collapserConfig.isRequestCacheEnabled()); - json.writeObjectFieldStart("metrics"); - HystrixCollapserConfiguration.CollapserMetricsConfig metricsConfig = collapserConfig.getCollapserMetricsConfig(); - json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); - json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); - json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); - json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); - json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); - json.writeEndObject(); + json.writeNumberField("activeCount", utilization.getCurrentActiveCount()); + json.writeNumberField("queueSize", utilization.getCurrentQueueSize()); + json.writeNumberField("corePoolSize", utilization.getCurrentCorePoolSize()); + json.writeNumberField("poolSize", utilization.getCurrentPoolSize()); json.writeEndObject(); } } diff --git a/settings.gradle b/settings.gradle index 7405bbb1f..24d313735 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,6 +10,7 @@ include 'hystrix-core', \ 'hystrix-contrib/hystrix-codahale-metrics-publisher', \ 'hystrix-contrib/hystrix-yammer-metrics-publisher', \ 'hystrix-contrib/hystrix-network-auditor-agent', \ +'hystrix-contrib/hystrix-reactivesocket-event-stream', \ 'hystrix-contrib/hystrix-javanica', \ 'hystrix-contrib/hystrix-junit', \ 'hystrix-dashboard' @@ -20,6 +21,7 @@ project(':hystrix-contrib/hystrix-servo-metrics-publisher').name = 'hystrix-serv project(':hystrix-contrib/hystrix-metrics-event-stream').name = 'hystrix-metrics-event-stream' project(':hystrix-contrib/hystrix-rx-netty-metrics-stream').name = 'hystrix-rx-netty-metrics-stream' project(':hystrix-contrib/hystrix-codahale-metrics-publisher').name = 'hystrix-codahale-metrics-publisher' +project(':hystrix-contrib/hystrix-reactivesocket-event-stream').name = 'hystrix-reactivesocket-event-stream' project(':hystrix-contrib/hystrix-yammer-metrics-publisher').name = 'hystrix-yammer-metrics-publisher' project(':hystrix-contrib/hystrix-network-auditor-agent').name = 'hystrix-network-auditor-agent' project(':hystrix-contrib/hystrix-javanica').name = 'hystrix-javanica' From 206d2a2803128bb74086ff4bfa86546992a6c049 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Thu, 19 May 2016 21:19:29 -0700 Subject: [PATCH 3/7] added test and enum findby id --- gradle/wrapper/gradle-wrapper.properties | 4 +- .../build.gradle | 2 + .../reactivesocket/EventStreamEnum.java | 6 +- .../EventStreamRequestHandler.java | 6 +- .../reactivesocket/StreamingSupplier.java | 4 +- .../reactivesocket/EventStreamEnumTest.java | 8 ++ .../EventStreamRequestHandlerTest.java | 111 ++++++++++++++++++ .../HystrixCollasperMetricsStreamTest.java | 49 ++++++++ .../HystrixCommandMetricsStreamTest.java | 51 ++++++++ .../sample/HystrixConfigStreamTest.java | 50 ++++++++ 10 files changed, 285 insertions(+), 6 deletions(-) create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnumTest.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStreamTest.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 5fd3d2d5e..43c4dcbe3 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Wed Dec 02 15:47:21 PST 2015 +#Thu May 19 16:56:49 PDT 2016 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-all.zip diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle index b8baf507c..44a2d8e63 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle @@ -4,6 +4,8 @@ repositories { maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' } } +sourceCompatibility = JavaVersion.VERSION_1_8 +targetCompatibility = JavaVersion.VERSION_1_8 dependencies { compile project(':hystrix-core') diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java index 8658fd3ad..79611a70a 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java @@ -53,10 +53,14 @@ public Observable get() { public static EventStreamEnum findByTypeId(int typeId) { return Arrays - .asList(EventStreamEnum.findByTypeId(typeId)) + .asList(EventStreamEnum.values()) .stream() .filter(t -> t.typeId == typeId) .findAny() .orElseThrow(() -> new IllegalStateException("no type id found for id => " + typeId)); } + + public int getTypeId() { + return typeId; + } } diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java index 0e78ee1de..48a964edf 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java @@ -41,9 +41,11 @@ public Publisher handleSubscription(Payload payload) { logger.error(t.getMessage(), t); return Observable.error(t); } - }); + }) + .onBackpressureDrop(); - return RxReactiveStreams.toPublisher(defer); + return RxReactiveStreams + .toPublisher(defer); } @Override diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java index 904bd19bf..73b3805c2 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java @@ -50,7 +50,9 @@ public ByteBuffer getMetadata() { subject.onNext(p); }) - ); + ) + .retry() + .subscribe(); } public boolean filter(T t) { diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnumTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnumTest.java new file mode 100644 index 000000000..6a975f366 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnumTest.java @@ -0,0 +1,8 @@ +package com.netflix.hystrix.contrib.reactivesocket; + + +public class EventStreamEnumTest { + public void test() { + + } +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java new file mode 100644 index 000000000..fc1667943 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java @@ -0,0 +1,111 @@ +package com.netflix.hystrix.contrib.reactivesocket; + + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; +import org.agrona.BitUtil; +import org.junit.Assert; +import org.junit.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import rx.schedulers.Schedulers; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class EventStreamRequestHandlerTest { + @Test + public void testEventStream() throws Exception { + Payload payload = new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer + .allocate(BitUtil.SIZE_OF_INT) + .putInt(EventStreamEnum.METRICS_STREAM.getTypeId()); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }; + + Schedulers + .io() + .createWorker() + .schedulePeriodically(() -> { + TestCommand testCommand = new TestCommand(); + testCommand.execute(); + }, 0, 1, TimeUnit.MILLISECONDS); + + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch1 = new CountDownLatch(5); + CountDownLatch latch2 = new CountDownLatch(15); + + AtomicReference subscriptionAtomicReference = new AtomicReference<>(); + + EventStreamRequestHandler handler = new EventStreamRequestHandler(); + Publisher payloadPublisher = handler.handleSubscription(payload); + + payloadPublisher + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriptionAtomicReference.set(s); + latch.countDown(); + } + + @Override + public void onNext(Payload payload) { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + + latch1.countDown(); + latch2.countDown(); + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onComplete() { + + } + }); + + latch.await(); + + Subscription subscription = subscriptionAtomicReference.get(); + subscription.request(5); + + latch1.await(); + + long count = latch2.getCount(); + Assert.assertTrue(count < 15); + + subscription.request(100); + + latch2.await(); + + } + + class TestCommand extends HystrixCommand { + protected TestCommand() { + super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); + } + + @Override + protected Boolean run() throws Exception { + return true; + } + } +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStreamTest.java new file mode 100644 index 000000000..7071ac6ec --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStreamTest.java @@ -0,0 +1,49 @@ +package com.netflix.hystrix.contrib.reactivesocket.metrics; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +public class HystrixCollasperMetricsStreamTest { + + @Test + public void test() throws Exception { + CountDownLatch latch = new CountDownLatch(21); + HystrixCommandMetricsStream + .getInstance() + .get() + .subscribe(payload -> { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + latch.countDown(); + }); + + + for (int i = 0; i < 20; i++) { + TestCommand test = new TestCommand(); + + test.execute(); + latch.countDown(); + } + + latch.await(); + } + + class TestCommand extends HystrixCommand { + protected TestCommand() { + super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); + } + + @Override + protected Boolean run() throws Exception { + return true; + } + } + + +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java new file mode 100644 index 000000000..7cbe344f5 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java @@ -0,0 +1,51 @@ +package com.netflix.hystrix.contrib.reactivesocket.metrics; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +/** + * Created by rroeser on 5/19/16. + */ +public class HystrixCommandMetricsStreamTest { + @Test + public void test() throws Exception { + CountDownLatch latch = new CountDownLatch(23); + HystrixCommandMetricsStream + .getInstance() + .get() + .subscribe(payload -> { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + latch.countDown(); + }); + + for (int i = 0; i < 20; i++) { + TestCommand test = new TestCommand(latch); + + test.execute(); + } + + latch.await(); + } + + class TestCommand extends HystrixCommand { + CountDownLatch latch; + protected TestCommand(CountDownLatch latch) { + super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); + this.latch = latch; + } + + @Override + protected Boolean run() throws Exception { + latch.countDown(); + return true; + } + } + +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java new file mode 100644 index 000000000..5212bab00 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java @@ -0,0 +1,50 @@ +package com.netflix.hystrix.contrib.reactivesocket.sample; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCommandMetricsStream; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +/** + * Created by rroeser on 5/19/16. + */ +public class HystrixConfigStreamTest { + @Test + public void test() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + HystrixCommandMetricsStream + .getInstance() + .get() + .subscribe(payload -> { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + latch.countDown(); + }); + + + for (int i = 0; i < 20; i++) { + TestCommand test = new TestCommand(); + + test.execute(); + } + + latch.await(); + } + + class TestCommand extends HystrixCommand { + protected TestCommand() { + super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); + } + + @Override + protected Boolean run() throws Exception { + System.out.println("IM A HYSTRIX COMMAND!!!!!"); + return true; + } + } +} \ No newline at end of file From acab36545819a5d4dd024786eba185a39cb357e1 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Thu, 19 May 2016 21:37:54 -0700 Subject: [PATCH 4/7] updated test --- .../contrib/reactivesocket/EventStreamRequestHandlerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java index fc1667943..24a06bd0a 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.AtomicReference; public class EventStreamRequestHandlerTest { - @Test + @Test(timeout = 5_000) public void testEventStream() throws Exception { Payload payload = new Payload() { @Override From 285e4eaf592a0f48ffe26223c45e33b49c7859ba Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Sun, 22 May 2016 18:22:32 -0700 Subject: [PATCH 5/7] added logging, and updated test --- .../reactivesocket/EventStreamEnum.java | 8 ++ .../EventStreamRequestHandlerTest.java | 75 ++++++++++++++++++- 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java index 79611a70a..d70efa23c 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java @@ -8,6 +8,8 @@ import com.netflix.hystrix.contrib.reactivesocket.sample.HystrixConfigStream; import com.netflix.hystrix.contrib.reactivesocket.sample.HystrixUtilizationStream; import io.reactivesocket.Payload; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Observable; import java.util.Arrays; @@ -18,24 +20,28 @@ public enum EventStreamEnum implements Supplier> { CONFIG_STREAM(1) { @Override public Observable get() { + logger.info("streaming config data"); return HystrixConfigStream.getInstance().get(); } }, REQUEST_EVENT_STREAM(2) { @Override public Observable get() { + logger.info("streaming request events"); return HystrixRequestEventsStream.getInstance().get(); } }, UTILIZATION_EVENT_STREAM(3) { @Override public Observable get() { + logger.info("streaming utilization events"); return HystrixUtilizationStream.getInstance().get(); } }, METRICS_STREAM(4) { @Override public Observable get() { + logger.info("streaming metrics"); return Observable.merge( HystrixCommandMetricsStream.getInstance().get(), HystrixThreadPoolMetricsStream.getInstance().get(), @@ -45,6 +51,8 @@ public Observable get() { ; + private static final Logger logger = LoggerFactory.getLogger(EventStreamEnum.class); + private int typeId; EventStreamEnum(int typeId) { diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java index 24a06bd0a..8c7a55855 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java @@ -19,8 +19,8 @@ import java.util.concurrent.atomic.AtomicReference; public class EventStreamRequestHandlerTest { - @Test(timeout = 5_000) - public void testEventStream() throws Exception { + @Test(timeout = 10_000) + public void testEventStreamRequestN() throws Exception { Payload payload = new Payload() { @Override public ByteBuffer getData() { @@ -98,6 +98,77 @@ public void onComplete() { } + @Test(timeout = 10_000) + public void testEventStreamFireHose() throws Exception { + Payload payload = new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer + .allocate(BitUtil.SIZE_OF_INT) + .putInt(EventStreamEnum.METRICS_STREAM.getTypeId()); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }; + + Schedulers + .io() + .createWorker() + .schedulePeriodically(() -> { + TestCommand testCommand = new TestCommand(); + testCommand.execute(); + }, 0, 1, TimeUnit.MILLISECONDS); + + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch1 = new CountDownLatch(25); + + AtomicReference subscriptionAtomicReference = new AtomicReference<>(); + + EventStreamRequestHandler handler = new EventStreamRequestHandler(); + Publisher payloadPublisher = handler.handleSubscription(payload); + + payloadPublisher + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriptionAtomicReference.set(s); + latch.countDown(); + } + + @Override + public void onNext(Payload payload) { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + + latch1.countDown(); + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onComplete() { + + } + }); + + latch.await(); + + Subscription subscription = subscriptionAtomicReference.get(); + subscription.request(Long.MAX_VALUE); + + latch1.await(); + + + } + class TestCommand extends HystrixCommand { protected TestCommand() { super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); From bdfd8f82509db9dc488c0a18040e6dd7ea4dc7c5 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Sun, 22 May 2016 18:40:49 -0700 Subject: [PATCH 6/7] switched to cbor and added a base class --- .../reactivesocket/BasePayloadSupplier.java | 19 +++++++++++++++++++ .../EventStreamRequestHandler.java | 5 +++-- .../reactivesocket/StreamingSupplier.java | 11 +---------- .../requests/HystrixRequestEventsStream.java | 13 +++---------- .../sample/HystrixConfigStream.java | 13 +++---------- .../sample/HystrixUtilizationStream.java | 13 +++---------- 6 files changed, 32 insertions(+), 42 deletions(-) create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java new file mode 100644 index 000000000..7d82215f3 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java @@ -0,0 +1,19 @@ +package com.netflix.hystrix.contrib.reactivesocket; + +import com.fasterxml.jackson.dataformat.cbor.CBORFactory; +import io.reactivesocket.Payload; +import rx.Observable; +import rx.subjects.BehaviorSubject; + +import java.util.function.Supplier; + +public abstract class BasePayloadSupplier implements Supplier> { + protected final CBORFactory jsonFactory; + + protected final BehaviorSubject subject; + + protected BasePayloadSupplier() { + this.jsonFactory = new CBORFactory(); + this.subject = BehaviorSubject.create(); + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java index 48a964edf..35b90e86d 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java @@ -9,8 +9,9 @@ import rx.RxReactiveStreams; /** - * An implementation of {@link RequestHandler} that provides a Hystrix Stream. Takes an integer which corresponds to - * an id in {@link EventStreamEnum}. If the id is found it will be again stream the events to the subscriber. + * An implementation of {@link RequestHandler} that provides a Hystrix Stream. Takes an 32-bit integer in the {@link Payload} + * data of a ReactiveSocket {@link io.reactivesocket.Frame} which corresponds to an id in {@link EventStreamEnum}. If + * the id is found it will begin to stream the events to the subscriber. */ public class EventStreamRequestHandler extends RequestHandler { private static final Logger logger = LoggerFactory.getLogger(EventStreamRequestHandler.class); diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java index 73b3805c2..807e2d50f 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java @@ -1,6 +1,5 @@ package com.netflix.hystrix.contrib.reactivesocket; -import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import io.reactivesocket.Frame; import io.reactivesocket.Payload; @@ -9,25 +8,17 @@ import rx.Observable; import rx.functions.Func0; import rx.schedulers.Schedulers; -import rx.subjects.BehaviorSubject; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import java.util.stream.Stream; -public abstract class StreamingSupplier implements Supplier> { +public abstract class StreamingSupplier extends BasePayloadSupplier { protected Logger logger = LoggerFactory.getLogger(StreamingSupplier.class); - protected BehaviorSubject subject; - - protected final JsonFactory jsonFactory; - protected StreamingSupplier() { - subject = BehaviorSubject.create(); - jsonFactory = new JsonFactory(); Observable .interval(500, TimeUnit.MILLISECONDS, Schedulers.computation()) diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java index 9d47f45de..7cf85a3d8 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java @@ -1,34 +1,27 @@ package com.netflix.hystrix.contrib.reactivesocket.requests; -import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.netflix.hystrix.ExecutionResult; import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier; import com.netflix.hystrix.metric.HystrixRequestEvents; import io.reactivesocket.Frame; import io.reactivesocket.Payload; import org.agrona.LangUtil; import rx.Observable; import rx.schedulers.Schedulers; -import rx.subjects.BehaviorSubject; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import java.util.function.Supplier; -public class HystrixRequestEventsStream implements Supplier> { +public class HystrixRequestEventsStream extends BasePayloadSupplier { private static HystrixRequestEventsStream INSTANCE = new HystrixRequestEventsStream(); - private BehaviorSubject subject; - - private JsonFactory jsonFactory; - private HystrixRequestEventsStream() { - this.jsonFactory = new JsonFactory(); - this.subject = BehaviorSubject.create(); + super(); com.netflix.hystrix.metric.HystrixRequestEventsStream.getInstance() .observe() diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java index 20d1c70e5..64f780f3d 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java @@ -1,6 +1,5 @@ package com.netflix.hystrix.contrib.reactivesocket.sample; -import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.netflix.hystrix.HystrixCollapserKey; import com.netflix.hystrix.HystrixCommandKey; @@ -10,28 +9,22 @@ import com.netflix.hystrix.config.HystrixConfiguration; import com.netflix.hystrix.config.HystrixConfigurationStream; import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; +import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier; import io.reactivesocket.Frame; import io.reactivesocket.Payload; import org.agrona.LangUtil; import rx.Observable; -import rx.subjects.BehaviorSubject; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; -import java.util.function.Supplier; -public class HystrixConfigStream implements Supplier> { +public class HystrixConfigStream extends BasePayloadSupplier { private static final HystrixConfigStream INSTANCE = new HystrixConfigStream(); - private BehaviorSubject subject; - - private JsonFactory jsonFactory; - private HystrixConfigStream() { - this.subject = BehaviorSubject.create(); - this.jsonFactory = new JsonFactory(); + super(); HystrixConfigurationStream stream = new HystrixConfigurationStream(100); stream diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java index 745ca1a7d..b5f9257da 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java @@ -1,9 +1,9 @@ package com.netflix.hystrix.contrib.reactivesocket.sample; -import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier; import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; import com.netflix.hystrix.metric.sample.HystrixThreadPoolUtilization; import com.netflix.hystrix.metric.sample.HystrixUtilization; @@ -11,24 +11,17 @@ import io.reactivesocket.Payload; import org.agrona.LangUtil; import rx.Observable; -import rx.subjects.BehaviorSubject; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; -import java.util.function.Supplier; -public class HystrixUtilizationStream implements Supplier> { +public class HystrixUtilizationStream extends BasePayloadSupplier { private static final HystrixUtilizationStream INSTANCE = new HystrixUtilizationStream(); - private BehaviorSubject subject; - - private JsonFactory jsonFactory; - private HystrixUtilizationStream() { - this.subject = BehaviorSubject.create(); - this.jsonFactory = new JsonFactory(); + super(); com.netflix.hystrix.metric.sample.HystrixUtilizationStream stream = new com.netflix.hystrix.metric.sample.HystrixUtilizationStream(100); From 98f96f78d79033b93891379e13ebd76523d4121e Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Mon, 23 May 2016 10:57:26 -0700 Subject: [PATCH 7/7] fixed typo, removed empty test --- .../hystrix/contrib/reactivesocket/EventStreamEnum.java | 4 ++-- ...ricsStream.java => HystrixCollapserMetricsStream.java} | 8 ++++---- .../contrib/reactivesocket/EventStreamEnumTest.java | 8 -------- ...amTest.java => HystrixCollapserMetricsStreamTest.java} | 2 +- 4 files changed, 7 insertions(+), 15 deletions(-) rename hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/{HystrixCollasperMetricsStream.java => HystrixCollapserMetricsStream.java} (94%) delete mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnumTest.java rename hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/{HystrixCollasperMetricsStreamTest.java => HystrixCollapserMetricsStreamTest.java} (95%) diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java index d70efa23c..5882e4c12 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java @@ -1,7 +1,7 @@ package com.netflix.hystrix.contrib.reactivesocket; -import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCollasperMetricsStream; +import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCollapserMetricsStream; import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCommandMetricsStream; import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixThreadPoolMetricsStream; import com.netflix.hystrix.contrib.reactivesocket.requests.HystrixRequestEventsStream; @@ -45,7 +45,7 @@ public Observable get() { return Observable.merge( HystrixCommandMetricsStream.getInstance().get(), HystrixThreadPoolMetricsStream.getInstance().get(), - HystrixCollasperMetricsStream.getInstance().get()); + HystrixCollapserMetricsStream.getInstance().get()); } } diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStream.java similarity index 94% rename from hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStream.java rename to hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStream.java index 7095c90d5..d936c865e 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStream.java @@ -12,14 +12,14 @@ import java.io.ByteArrayOutputStream; import java.util.stream.Stream; -public class HystrixCollasperMetricsStream extends StreamingSupplier { - private static HystrixCollasperMetricsStream INSTANCE = new HystrixCollasperMetricsStream(); +public class HystrixCollapserMetricsStream extends StreamingSupplier { + private static HystrixCollapserMetricsStream INSTANCE = new HystrixCollapserMetricsStream(); - private HystrixCollasperMetricsStream() { + private HystrixCollapserMetricsStream() { super(); } - public static HystrixCollasperMetricsStream getInstance() { + public static HystrixCollapserMetricsStream getInstance() { return INSTANCE; } diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnumTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnumTest.java deleted file mode 100644 index 6a975f366..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnumTest.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.netflix.hystrix.contrib.reactivesocket; - - -public class EventStreamEnumTest { - public void test() { - - } -} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStreamTest.java similarity index 95% rename from hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStreamTest.java rename to hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStreamTest.java index 7071ac6ec..a65d6ff89 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStreamTest.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStreamTest.java @@ -7,7 +7,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; -public class HystrixCollasperMetricsStreamTest { +public class HystrixCollapserMetricsStreamTest { @Test public void test() throws Exception {