Skip to content

Commit 92a01db

Browse files
Dave Syerartembilan
authored andcommitted
Migrate leader election support from Spring Cloud
Fix `LeaderInitiator` * Implement `SmartLifecycle` instead of just `Lifecycle` * Add `Assert`s * Rework logic from `IMap` for locks just into single `ILock`, since we require it only for the `role` as a key * Fix `Future.cancel(true)` logic via rescheduling See https://jira.spring.io/browse/INT-4058 and its commit comments * Add more complex test-case to meet distributed requirements and verify several `yield()` cycles
1 parent 8dcca5b commit 92a01db

File tree

3 files changed

+606
-0
lines changed

3 files changed

+606
-0
lines changed

spring-integration-hazelcast/README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,3 +458,25 @@ public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler()
458458
}
459459
460460
```
461+
462+
## HAZELCAST LEADER ELECTION
463+
464+
If you need to elect a leader (e.g. for highly available message consumer where only one node should receive messages)
465+
you just need to create a `LeaderInitiator`. Example:
466+
467+
```java
468+
@Bean
469+
public HazelcastInstance hazelcastInstance() {
470+
return Hazelcast.newHazelcastInstance();
471+
}
472+
473+
@Bean
474+
public LeaderInitiator initiator() {
475+
LeaderInitiator initiator = new LeaderInitiator(hazelcastInstance());
476+
return initiator;
477+
}
478+
```
479+
480+
Then when a node is elected leader it will send `OnGrantedEvent` to all application listeners. See
481+
the [Spring Integration User Guide](http://docs.spring.io/spring-integration/reference/htmlsingle/#endpoint-roles)
482+
for more information on how to use those events to control messaging endpoints.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
/*
2+
* Copyright 2014-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.hazelcast.leader;
18+
19+
import java.util.concurrent.Callable;
20+
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.Future;
23+
import java.util.concurrent.ThreadFactory;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import org.springframework.beans.factory.DisposableBean;
27+
import org.springframework.context.ApplicationEventPublisher;
28+
import org.springframework.context.ApplicationEventPublisherAware;
29+
import org.springframework.context.SmartLifecycle;
30+
import org.springframework.integration.leader.Candidate;
31+
import org.springframework.integration.leader.Context;
32+
import org.springframework.integration.leader.DefaultCandidate;
33+
import org.springframework.integration.leader.event.DefaultLeaderEventPublisher;
34+
import org.springframework.integration.leader.event.LeaderEventPublisher;
35+
import org.springframework.util.Assert;
36+
37+
import com.hazelcast.core.HazelcastInstance;
38+
import com.hazelcast.core.ILock;
39+
40+
/**
41+
* Bootstrap leadership {@link org.springframework.integration.leader.Candidate candidates}
42+
* with Hazelcast. Upon construction, {@link #start} must be invoked to
43+
* register the candidate for leadership election.
44+
*
45+
* @author Patrick Peralta
46+
* @author Gary Russell
47+
* @author Dave Syer
48+
* @author Artem Bilan
49+
*/
50+
public class LeaderInitiator implements SmartLifecycle, DisposableBean, ApplicationEventPublisherAware {
51+
52+
private static int threadNameCount = 0;
53+
54+
private static final Context NULL_CONTEXT = new NullContext();
55+
56+
/**
57+
* Hazelcast client.
58+
*/
59+
private final HazelcastInstance client;
60+
61+
/**
62+
* Candidate for leader election.
63+
*/
64+
private final Candidate candidate;
65+
66+
/**
67+
* Executor service for running leadership daemon.
68+
*/
69+
private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
70+
71+
@Override
72+
public Thread newThread(Runnable r) {
73+
Thread thread = new Thread(r, "Hazelcast-leadership-" + (threadNameCount++));
74+
thread.setDaemon(true);
75+
return thread;
76+
}
77+
78+
});
79+
80+
/**
81+
* Future returned by submitting an {@link LeaderSelector} to {@link #executorService}.
82+
* This is used to cancel leadership.
83+
*/
84+
private volatile Future<Void> future;
85+
86+
/**
87+
* Hazelcast distributed lock.
88+
*/
89+
private volatile ILock lock;
90+
91+
private LeaderSelector leaderSelector;
92+
93+
/**
94+
* Leader event publisher.
95+
*/
96+
private volatile LeaderEventPublisher leaderEventPublisher = new DefaultLeaderEventPublisher();
97+
98+
private boolean customPublisher = false;
99+
100+
/**
101+
* @see SmartLifecycle
102+
*/
103+
private volatile boolean autoStartup = true;
104+
105+
/**
106+
* @see SmartLifecycle which is an extension of org.springframework.context.Phased
107+
*/
108+
private volatile int phase;
109+
110+
/**
111+
* Flag that indicates whether the leadership election for
112+
* this {@link #candidate} is running.
113+
*/
114+
private volatile boolean running;
115+
116+
/**
117+
* Construct a {@link LeaderInitiator} with a default candidate.
118+
*
119+
* @param client Hazelcast client
120+
*/
121+
public LeaderInitiator(HazelcastInstance client) {
122+
this(client, new DefaultCandidate());
123+
}
124+
125+
/**
126+
* Construct a {@link LeaderInitiator}.
127+
*
128+
* @param client Hazelcast client
129+
* @param candidate leadership election candidate
130+
*/
131+
public LeaderInitiator(HazelcastInstance client, Candidate candidate) {
132+
Assert.notNull(client, "'client' must not be null");
133+
Assert.notNull(candidate, "'candidate' must not be null");
134+
this.client = client;
135+
this.candidate = candidate;
136+
}
137+
138+
/**
139+
* Sets the {@link LeaderEventPublisher}.
140+
* @param leaderEventPublisher the event publisher
141+
*/
142+
public void setLeaderEventPublisher(LeaderEventPublisher leaderEventPublisher) {
143+
Assert.notNull(leaderEventPublisher);
144+
this.leaderEventPublisher = leaderEventPublisher;
145+
this.customPublisher = true;
146+
}
147+
148+
/**
149+
* @return the context (or null if not running)
150+
*/
151+
public Context getContext() {
152+
if (this.leaderSelector == null) {
153+
return NULL_CONTEXT;
154+
}
155+
return this.leaderSelector.context;
156+
}
157+
158+
@Override
159+
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
160+
if (!this.customPublisher) {
161+
this.leaderEventPublisher = new DefaultLeaderEventPublisher(applicationEventPublisher);
162+
}
163+
}
164+
165+
public void setAutoStartup(boolean autoStartup) {
166+
this.autoStartup = autoStartup;
167+
}
168+
169+
@Override
170+
public boolean isAutoStartup() {
171+
return this.autoStartup;
172+
}
173+
174+
public void setPhase(int phase) {
175+
this.phase = phase;
176+
}
177+
178+
@Override
179+
public int getPhase() {
180+
return this.phase;
181+
}
182+
183+
/**
184+
* Start the registration of the {@link #candidate} for leader election.
185+
*/
186+
@Override
187+
public synchronized void start() {
188+
if (!this.running) {
189+
this.lock = this.client.getLock(this.candidate.getRole());
190+
this.running = true;
191+
this.leaderSelector = new LeaderSelector();
192+
this.future = this.executorService.submit(this.leaderSelector);
193+
}
194+
}
195+
196+
@Override
197+
public void stop(Runnable callback) {
198+
stop();
199+
if (callback != null) {
200+
callback.run();
201+
}
202+
}
203+
204+
/**
205+
* Stop the registration of the {@link #candidate} for leader election.
206+
* If the candidate is currently leader, its leadership will be revoked.
207+
*/
208+
@Override
209+
public synchronized void stop() {
210+
if (this.running) {
211+
this.running = false;
212+
this.future.cancel(true);
213+
}
214+
}
215+
216+
/**
217+
* @return true if leadership election for this {@link #candidate} is running
218+
*/
219+
@Override
220+
public boolean isRunning() {
221+
return this.running;
222+
}
223+
224+
@Override
225+
public void destroy() throws Exception {
226+
stop();
227+
this.executorService.shutdown();
228+
}
229+
230+
/**
231+
* Callable that manages the acquisition of Hazelcast locks
232+
* for leadership election.
233+
*/
234+
protected class LeaderSelector implements Callable<Void> {
235+
236+
protected final HazelcastContext context = new HazelcastContext();
237+
238+
protected final String role = LeaderInitiator.this.candidate.getRole();
239+
240+
private volatile boolean locked = false;
241+
242+
@Override
243+
public Void call() throws Exception {
244+
try {
245+
while (LeaderInitiator.this.running) {
246+
try {
247+
this.locked = LeaderInitiator.this.lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
248+
if (this.locked) {
249+
LeaderInitiator.this.leaderEventPublisher.publishOnGranted(LeaderInitiator.this,
250+
this.context, this.role);
251+
LeaderInitiator.this.candidate.onGranted(context);
252+
Thread.sleep(Long.MAX_VALUE);
253+
}
254+
}
255+
catch (InterruptedException e) {
256+
if (this.locked) {
257+
LeaderInitiator.this.lock.unlock();
258+
this.locked = false;
259+
// The lock was broken and we are no longer leader
260+
LeaderInitiator.this.candidate.onRevoked(this.context);
261+
if (LeaderInitiator.this.leaderEventPublisher != null) {
262+
LeaderInitiator.this.leaderEventPublisher.publishOnRevoked(
263+
LeaderInitiator.this, this.context,
264+
LeaderInitiator.this.candidate.getRole());
265+
}
266+
Thread.currentThread().interrupt();
267+
return null;
268+
}
269+
}
270+
}
271+
}
272+
finally {
273+
if (this.locked) {
274+
LeaderInitiator.this.lock.unlock();
275+
this.locked = false;
276+
// We are stopping, therefore not leading any more
277+
LeaderInitiator.this.candidate.onRevoked(this.context);
278+
if (LeaderInitiator.this.leaderEventPublisher != null) {
279+
LeaderInitiator.this.leaderEventPublisher.publishOnRevoked(
280+
LeaderInitiator.this, this.context, this.role);
281+
}
282+
}
283+
}
284+
return null;
285+
}
286+
287+
}
288+
289+
/**
290+
* Implementation of leadership context backed by Hazelcast.
291+
*/
292+
protected class HazelcastContext implements Context {
293+
294+
@Override
295+
public boolean isLeader() {
296+
return LeaderInitiator.this.leaderSelector.locked;
297+
}
298+
299+
@Override
300+
public void yield() {
301+
if (LeaderInitiator.this.future != null) {
302+
LeaderInitiator.this.future.cancel(true);
303+
LeaderInitiator.this.future =
304+
LeaderInitiator.this.executorService.submit(LeaderInitiator.this.leaderSelector);
305+
}
306+
}
307+
308+
@Override
309+
public String toString() {
310+
return "HazelcastContext{role=" + LeaderInitiator.this.candidate.getRole() +
311+
", id=" + LeaderInitiator.this.candidate.getId() +
312+
", isLeader=" + isLeader() + "}";
313+
}
314+
315+
}
316+
317+
private static final class NullContext implements Context {
318+
319+
@Override
320+
public boolean isLeader() {
321+
return false;
322+
}
323+
324+
@Override
325+
public void yield() {
326+
// No-op
327+
}
328+
329+
}
330+
331+
}

0 commit comments

Comments
 (0)