-
Notifications
You must be signed in to change notification settings - Fork 4.7k
/
HystrixConcurrencyStrategy.java
173 lines (160 loc) · 8.77 KB
/
HystrixConcurrencyStrategy.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.strategy.concurrency;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixThreadPool;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import com.netflix.hystrix.util.PlatformSpecific;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Abstract class for defining different behavior or implementations for concurrency related aspects of the system with default implementations.
* <p>
* For example, every {@link Callable} executed by {@link HystrixCommand} will call {@link #wrapCallable(Callable)} to give a chance for custom implementations to decorate the {@link Callable} with
* additional behavior.
* <p>
* When you implement a concrete {@link HystrixConcurrencyStrategy}, you should make the strategy idempotent w.r.t ThreadLocals.
* Since the usage of threads by Hystrix is internal, Hystrix does not attempt to apply the strategy in an idempotent way.
* Instead, you should write your strategy to work idempotently. See https://github.com/Netflix/Hystrix/issues/351 for a more detailed discussion.
* <p>
* See {@link HystrixPlugins} or the Hystrix GitHub Wiki for information on configuring plugins: <a
* href="https://github.com/Netflix/Hystrix/wiki/Plugins">https://github.com/Netflix/Hystrix/wiki/Plugins</a>.
*/
public abstract class HystrixConcurrencyStrategy {
private final static Logger logger = LoggerFactory.getLogger(HystrixConcurrencyStrategy.class);
/**
* Factory method to provide {@link ThreadPoolExecutor} instances as desired.
* <p>
* Note that the corePoolSize, maximumPoolSize and keepAliveTime values will be dynamically set during runtime if their values change using the {@link ThreadPoolExecutor#setCorePoolSize},
* {@link ThreadPoolExecutor#setMaximumPoolSize} and {@link ThreadPoolExecutor#setKeepAliveTime} methods.
* <p>
* <b>Default Implementation</b>
* <p>
* Implementation using standard java.util.concurrent.ThreadPoolExecutor
*
* @param threadPoolKey
* {@link HystrixThreadPoolKey} representing the {@link HystrixThreadPool} that this {@link ThreadPoolExecutor} will be used for.
* @param corePoolSize
* Core number of threads requested via properties (or system default if no properties set).
* @param maximumPoolSize
* Max number of threads requested via properties (or system default if no properties set).
* @param keepAliveTime
* Keep-alive time for threads requested via properties (or system default if no properties set).
* @param unit
* {@link TimeUnit} corresponding with keepAliveTime
* @param workQueue
* {@code BlockingQueue<Runnable>} as provided by {@link #getBlockingQueue(int)}
* @return instance of {@link ThreadPoolExecutor}
*/
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
ThreadFactory threadFactory = null;
if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
threadFactory = new ThreadFactory() {
protected final AtomicInteger threadNumber = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
thread.setDaemon(true);
return thread;
}
};
} else {
threadFactory = PlatformSpecific.getAppEngineThreadFactory();
}
final int dynamicCoreSize = corePoolSize.get();
final int dynamicMaximumSize = maximumPoolSize.get();
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory);
}
}
/**
* Factory method to provide instance of {@code BlockingQueue<Runnable>} used for each {@link ThreadPoolExecutor} as constructed in {@link #getThreadPool}.
* <p>
* Note: The maxQueueSize value is provided so any type of queue can be used but typically an implementation such as {@link SynchronousQueue} without a queue (just a handoff) is preferred as
* queueing is an anti-pattern to be purposefully avoided for latency tolerance reasons.
* <p>
* <b>Default Implementation</b>
* <p>
* Implementation returns {@link SynchronousQueue} when maxQueueSize <= 0 or {@link LinkedBlockingQueue} when maxQueueSize > 0.
*
* @param maxQueueSize
* The max size of the queue requested via properties (or system default if no properties set).
* @return instance of {@code BlockingQueue<Runnable>}
*/
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
/*
* We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted).
* <p>
* SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want.
* <p>
* Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues
* and rejecting is the preferred solution.
*/
if (maxQueueSize <= 0) {
return new SynchronousQueue<Runnable>();
} else {
return new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
}
/**
* Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
* <p>
* This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
* <p>
* <b>Default Implementation</b>
* <p>
* Pass-thru that does no wrapping.
*
* @param callable
* {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
* @return {@code Callable<T>} either as a pass-thru or wrapping the one given
*/
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return callable;
}
/**
* Factory method to return an implementation of {@link HystrixRequestVariable} that behaves like a {@link ThreadLocal} except that it
* is scoped to a request instead of a thread.
* <p>
* For example, if a request starts with an HTTP request and ends with the HTTP response, then {@link HystrixRequestVariable} should
* be initialized at the beginning, available on any and all threads spawned during the request and then cleaned up once the HTTP request is completed.
* <p>
* If this method is implemented it is generally necessary to also implemented {@link #wrapCallable(Callable)} in order to copy state
* from parent to child thread.
*
* @param rv
* {@link HystrixRequestVariableLifecycle} with lifecycle implementations from Hystrix
* @return {@code HystrixRequestVariable<T>}
*/
public <T> HystrixRequestVariable<T> getRequestVariable(final HystrixRequestVariableLifecycle<T> rv) {
return new HystrixLifecycleForwardingRequestVariable<T>(rv);
}
}