distributed redis-based delay queue written in java. this library is design for distributed system delay-queue, which does't require strict exactly delay time, but maybe have large data count 。
view 设计思路
With maven:
<dependency>
<groupId>cn.anguslean</groupId>
<artifactId>delay-quene4j</artifactId>
<version>0.0.1</version>
</dependency>
With gradle:
implementation 'cn.anguslean:delay-quene4j:0.0.1'
to use delay-queue4j
, the next 3 steps is needed:
- First, you must set
RedisProvider
,LockProvider
, andJsonProvider
。 currently this library only provide jackson and redisson implementation, if you project use then also, you can use those implementation directly:
Config config = new Config();
config.useSingleServer()
.setAddress(host)
.setPassword(psd)
.setDatabase(1);
// Sync and Async API
redisson = Redisson.create(config);
RedisProvider redisProvider = new RedissonRedisProvider(redisson);
LockProvider lockProvider = new RedissonRedisLockProvider(redisson);
JsonProvider jsonProvider = new JacksonProvider();
delayMsgConfig = new DelayMsgConfig(redisProvider, lockProvider, jsonProvider);
but you must carefully notice redisson
and jackson
version conflict problem.
- Second, register you delay message handler callback
//system is the key which callback handler focus in
delayMsgConfig.addDelayCallBack(system, (uuid, message) -> {
System.out.println("收到消息" + uuid + ", " + message);
latch.countDown();
});
- Third, publish you delay message
delayMsgConfig.addDelayMessage(DelayedInfoDTO.builder()
.delayTime(Math.abs(new Random().nextLong()) % 20)
// callback handler key
.system(system)
.message(system + String.format("%2d", new Random().nextInt(100)))
.uuid(UUID.randomUUID().toString())
.build());
Test Demo:
public class DelayMsgConfigTest1 {
//最大的延时误差
private static final int MAX_MARGIN = 3;
private DelayMsgConfig delayMsgConfig;
@Before
public void setUp() {
String host = "redis://redis.dev1.ctstest.com:6379";
String psd = "password";
Config config = new Config();
config.useSingleServer()
.setAddress(host)
.setPassword(psd)
.setDatabase(1);
// Sync and Async API
RedissonClient redisson = Redisson.create(config);
delayMsgConfig = new DelayMsgConfig(redisson);
}
@Test
public void addData() throws Exception {
delayMsgConfig.setCorePoolSize(10);
delayMsgConfig.setMaximumPoolSize(20);
delayMsgConfig.begin();
TimeUnit.SECONDS.sleep(2);
String system = "DELAY-ATEST1";
CountDownLatch latch = new CountDownLatch(1);
//1. step1- register delay message callback
delayMsgConfig.addDelayCallBack(system, (uuid, message) -> {
System.out.println("收到消息" + uuid + ", " + message);
latch.countDown();
});
//2. step2- add a delay message, which delayed key must match callback function key
delayMsgConfig.addDelayMessage(DelayedInfoDTO.builder()
.delayTime(Math.abs(new Random().nextLong()) % 20)
.system(system)
.message(system + String.format("%2d", new Random().nextInt(100)))
.uuid(UUID.randomUUID().toString())
.build());
}
}