/
ReliableTopicConfig.java
318 lines (283 loc) · 11.8 KB
/
ReliableTopicConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
/*
* Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hazelcast.config;
import com.hazelcast.spi.annotation.Beta;
import com.hazelcast.topic.TopicOverloadPolicy;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executor;
import static com.hazelcast.topic.TopicOverloadPolicy.BLOCK;
import static com.hazelcast.util.Preconditions.checkHasText;
import static com.hazelcast.util.Preconditions.checkNotNull;
import static com.hazelcast.util.Preconditions.checkPositive;
/**
* Configuration for a reliable {@link com.hazelcast.core.ITopic}.
*
* The reliable topic makes use of the {@link com.hazelcast.ringbuffer.Ringbuffer} to store the actual messages.
*
* To configure the ringbuffer for a reliable topic, define a ringbuffer in the config with exactly the same name. It is very
* unlikely that you want to run with the default settings.
*
* When a ReliableTopic starts, it will always start from the tail+1 item from the RingBuffer. It will not chew its way through
* all available events but it will wait for the next item being published.
*
* In the reliable topic, global order is always maintained, so all listeners will observe exactly the same order of sequence of
* messages.
*/
@Beta
public class ReliableTopicConfig {
/**
* The default read batch size.
*/
public static final int DEFAULT_READ_BATCH_SIZE = 10;
/**
* The default slow consumer policy.
*/
public static final TopicOverloadPolicy DEFAULT_TOPIC_OVERLOAD_POLICY = BLOCK;
/**
* Default value for statistics enabled.
*/
public static final boolean DEFAULT_STATISTICS_ENABLED = true;
private Executor executor;
private int readBatchSize = DEFAULT_READ_BATCH_SIZE;
private String name;
private boolean statisticsEnabled = DEFAULT_STATISTICS_ENABLED;
private List<ListenerConfig> listenerConfigs = new LinkedList<ListenerConfig>();
private TopicOverloadPolicy topicOverloadPolicy = DEFAULT_TOPIC_OVERLOAD_POLICY;
public ReliableTopicConfig() {
}
/**
* Creates a new ReliableTopicConfig with default settings.
*/
public ReliableTopicConfig(String name) {
this.name = checkNotNull(name, "name");
}
/**
* Creates a new ReliableTopicConfig by cloning an existing one.
*
* @param config the ReliableTopicConfig to clone.
*/
ReliableTopicConfig(ReliableTopicConfig config) {
this.name = config.name;
this.statisticsEnabled = config.statisticsEnabled;
this.readBatchSize = config.readBatchSize;
this.executor = config.executor;
this.topicOverloadPolicy = config.topicOverloadPolicy;
this.listenerConfigs = config.listenerConfigs;
}
ReliableTopicConfig(ReliableTopicConfig config, String name) {
this(config);
this.name = name;
}
/**
* Sets the name of the reliable topic.
*
* @param name the name of the reliable topic
* @return the updated ReliableTopicConfig
* @throws IllegalArgumentException if name is null or an empty string.
*/
public ReliableTopicConfig setName(String name) {
this.name = checkHasText(name, "name must contain text");
return this;
}
/**
* Gets the name of the reliable topic.
*
* @return the name of the reliable topic.
*/
public String getName() {
return name;
}
/**
* Gets the TopicOverloadPolicy for this reliable topic.
*
* @return the TopicOverloadPolicy.
*/
public TopicOverloadPolicy getTopicOverloadPolicy() {
return topicOverloadPolicy;
}
/**
* Sets the TopicOverloadPolicy for this reliable topic. Check the {@link TopicOverloadPolicy} for more details about
* this setting.
*
* @param topicOverloadPolicy the new TopicOverloadPolicy.
* @return the updated reliable topic config.
* @throws IllegalArgumentException if topicOverloadPolicy is null.
*/
public ReliableTopicConfig setTopicOverloadPolicy(TopicOverloadPolicy topicOverloadPolicy) {
this.topicOverloadPolicy = checkNotNull(topicOverloadPolicy, "topicOverloadPolicy can't be null");
return this;
}
/**
* Gets the Executor that is going to process the events.
*
* If no Executor is selected, then the {@link com.hazelcast.spi.ExecutionService#ASYNC_EXECUTOR} is used.
*
* @return the Executor used to process events.
* @see #setExecutor(java.util.concurrent.Executor)
*/
public Executor getExecutor() {
return executor;
}
/**
* Sets the Executor that is going to process the event.
*
* In some cases it is desirable to set a specific Executor. For example, you may want to isolate a certain topic from other
* topics because it contains long running messages or very high priority messages.
*
* A single Executor can be shared between multiple Reliable topics, although it could take more time to process a message.
* If a single Executor is not shared with other reliable topics, then the Executor only needs to have a single thread.
*
* @param executor the Executor. if the executor is null, the {@link com.hazelcast.spi.ExecutionService#ASYNC_EXECUTOR} will
* be used to process the event.
* @return the updated config.
*/
public ReliableTopicConfig setExecutor(Executor executor) {
this.executor = executor;
return this;
}
/**
* Gets the maximum number of items to read in a batch. Returned value will always be equal or larger than 1.
*
* @return the read batch size.
*/
public int getReadBatchSize() {
return readBatchSize;
}
/**
* Sets the read batch size.
*
* The ReliableTopic tries to read a batch of messages from the ringbuffer. It will get at least one, but
* if there are more available, then it will try to get more to increase throughput. The minimal read
* batch size can be influenced using the read batch size.
*
* Apart from influencing the number of messages to download, the readBatchSize also determines how many
* messages will be processed by the thread running the MessageListener before it returns back to the pool
* to look for other MessageListeners that need to be processed. The problem with returning to the pool and
* looking for new work is that interacting with an Executor is quite expensive due to contention on the
* work-queue. The more work that can be done without retuning to the pool, the smaller the overhead.
*
* If the readBatchSize is 10 and there are 50 messages available, 10 items are retrieved and processed
* consecutively before the thread goes back to the pool and helps out with the processing of other messages.
*
* If the readBatchSize is 10 and there are 2 items available, 2 items are retrieved and processed consecutively.
*
* If the readBatchSize is an issue because a thread will be busy too long with processing a single MessageListener
* and it can't help out other MessageListeners, increase the size of the threadpool so the other MessageListeners don't
* need to wait for a thread, but can be processed in parallel.
*
* @param readBatchSize the maximum number of items to read in a batch.
* @return the updated reliable topic config.
* @throws IllegalArgumentException if readBatchSize is smaller than 1.
*/
public ReliableTopicConfig setReadBatchSize(int readBatchSize) {
this.readBatchSize = checkPositive(readBatchSize, "readBatchSize should be positive");
return this;
}
/**
* Checks if statistics are enabled for this reliable topic.
*
* @return true if enabled, false otherwise.
*/
public boolean isStatisticsEnabled() {
return statisticsEnabled;
}
/**
* Enables or disables statistics for this reliable topic..
*
* @param statisticsEnabled true to enable statistics, false to disable.
* @return the updated reliable topic config.
*/
public ReliableTopicConfig setStatisticsEnabled(boolean statisticsEnabled) {
this.statisticsEnabled = statisticsEnabled;
return this;
}
/**
* Sets the list of message listeners (listens for when messages are added or removed) for this topic.
*
* @param listenerConfigs The list of message listeners for this topic.
* @return This updated topic configuration.
*/
public ReliableTopicConfig setMessageListenerConfigs(List<ListenerConfig> listenerConfigs) {
this.listenerConfigs = listenerConfigs != null ? listenerConfigs : new LinkedList<ListenerConfig>();
return this;
}
/**
* Gets the list of message listeners (listens for when messages are added or removed) for this reliable topic.
*
* @return list of MessageListener configurations.
*/
public List<ListenerConfig> getMessageListenerConfigs() {
return listenerConfigs;
}
/**
* Adds a message listener (listens for when messages are added or removed) to this reliable topic.
*
* @param listenerConfig the ListenerConfig to add.
* @return the updated config.
* @throws NullPointerException if listenerConfig is null.
*/
public ReliableTopicConfig addMessageListenerConfig(ListenerConfig listenerConfig) {
checkNotNull(listenerConfig, "listenerConfig can't be null");
listenerConfigs.add(listenerConfig);
return this;
}
@Override
public String toString() {
return "ReliableTopicConfig{"
+ "name='" + name + '\''
+ ", topicOverloadPolicy=" + topicOverloadPolicy
+ ", executor=" + executor
+ ", readBatchSize=" + readBatchSize
+ ", statisticsEnabled=" + statisticsEnabled
+ ", listenerConfigs=" + listenerConfigs
+ '}';
}
/**
* Gets immutable version of this configuration.
*
* @return Immutable version of this configuration.
* @deprecated this method will be removed in 4.0; it is meant for internal usage only.
*/
public ReliableTopicConfig getAsReadOnly() {
return new ReliableTopicConfigReadOnly(this);
}
static class ReliableTopicConfigReadOnly extends ReliableTopicConfig {
ReliableTopicConfigReadOnly(ReliableTopicConfig config) {
super(config);
}
@Override
public ReliableTopicConfig setExecutor(Executor executor) {
throw new UnsupportedOperationException("This config is read-only");
}
@Override
public ReliableTopicConfig setReadBatchSize(int readBatchSize) {
throw new UnsupportedOperationException("This config is read-only");
}
@Override
public ReliableTopicConfig setStatisticsEnabled(boolean statisticsEnabled) {
throw new UnsupportedOperationException("This config is read-only");
}
@Override
public ReliableTopicConfig addMessageListenerConfig(ListenerConfig listenerConfig) {
throw new UnsupportedOperationException("This config is read-only");
}
@Override
public ReliableTopicConfig setTopicOverloadPolicy(TopicOverloadPolicy topicOverloadPolicy) {
throw new UnsupportedOperationException("This config is read-only");
}
}
}