-
Notifications
You must be signed in to change notification settings - Fork 12k
CONSUME_FROM_TIMESTAMP doesn't work #2286
Description
BUG REPORT
1. Please describe the issue you observed:
【需求】:
使用PushConsumer订阅了一个topic,并且想只订阅从这个consumer订阅的时间点起往后的数据(不需要历史数据)。我的想法是设置consumer的ConsumeFromWhere.CONSUME_FROM_TIMESTAMP和当前时间作为timestamp,如下:
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
String timestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis());
consumer.setConsumeTimestamp(timestamp);
为了保证consumer启动的时候所在的consumergroup每次都是新的,我将consumergroup设置为时间戳:
Date date = new Date();
String strDateFormat = "yyyyMMddHHmmss";
SimpleDateFormat sdf = new SimpleDateFormat(strDateFormat);
String now = sdf.format(date);
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(now);
【操作】:
在启动这个consumer之前,我先写了一个producer,往这个topic中写数据(写了10条)。确保数据写完之后,我启动了consumer。
【实际结果】:
这个consumer依然读取到了这10条数据。
ConsumeFromWhere.CONSUME_FROM_TIMESTAMP没有起作用。
【预期结果】:
由于这个consumer启动晚于producer,所以应该读取不到这些数据。
2. Please tell us about your environment:
rocketmq 4.7.0 单broker;单namesrv
3. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):
完整代码如下:
producer:
public class SyncProducer {
public static String TOPIC_NAME = "newManualTopic";
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message(TOPIC_NAME /* Topic /,
"TagA" / Tag /,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) / Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
Consumer:
import static producer.SyncProducer.TOPIC_NAME;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
Date date = new Date();
String strDateFormat = "yyyyMMddHHmmss";
SimpleDateFormat sdf = new SimpleDateFormat(strDateFormat);
String now = sdf.format(date);
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(now);
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe(TOPIC_NAME, "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);//ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
String timestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis());
consumer.setConsumeTimestamp(timestamp);
System.out.println("now:"+now);
System.out.println("timestamp:"+timestamp);
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}