Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetAndWrapmessage method in rocketmqUtil is missing prefix when getting keys #294

Closed
cj-8480 opened this issue Sep 2, 2020 · 5 comments
Closed
Labels
bug Something isn't working

Comments

@cj-8480
Copy link
Contributor

cj-8480 commented Sep 2, 2020

以master分支代码为例
1、先使用下图方法进行 Message 转换时 加上了前缀
image
toRocketHeaderKey方法,如下图
image

2、RocketMQTemplate.send 方法 传入SpringMessage 后,工具类会调用下图方法进行message转换
image
此时,会导致KEYS参数丢失,因为缺少前缀

@RongtongJin RongtongJin changed the title RocketMQUtil中 getAndWrapMessage 在获取 KEYS时缺少前缀 GetAndWrapmessage method in rocketmqUtil is missing prefix when getting keys Sep 4, 2020
@RongtongJin
Copy link
Contributor

@cj-8480 Could you help fix the issue and verify it with UT ?

@cj-8480
Copy link
Contributor Author

cj-8480 commented Sep 7, 2020

@RongtongJin
直接补充前缀应该会影响到其他正在使用的。
我在自己程序里面增加下面这个方法暂时先解决了。


private Message convertToSpringMessage(org.apache.rocketmq.common.message.Message message) {
	Message convertToSpringMessage = RocketMQUtil.convertToSpringMessage(message);
	// 转换Message补充KEYS参数,解决KEYS空问题
	// --- start ---
	Message targetMessage = null;
	if (convertToSpringMessage instanceof GenericMessage) {
		GenericMessage sourceMessage = (GenericMessage) convertToSpringMessage;
		Map headers = new HashMap<>();
		headers.putAll(sourceMessage.getHeaders());
		headers.put(RocketMQHeaders.KEYS, message.getKeys());
		targetMessage = new GenericMessage<>(sourceMessage.getPayload(), headers);
	}
	// --- end ---

	return targetMessage;
}

@cj-8480
Copy link
Contributor Author

cj-8480 commented Sep 7, 2020

@RongtongJin 我这边应该没有提交权限的
我改的话,对应方法改改后如下:

private static Message getAndWrapMessage(String destination, MessageHeaders headers, byte[] payloads) {
    if (destination == null || destination.length() < 1) {
        return null;
    }
    if (payloads == null || payloads.length < 1) {
        return null;
    }
    String[] tempArr = destination.split(":", 2);
    String topic = tempArr[0];
    String tags = "";
    if (tempArr.length > 1) {
        tags = tempArr[1];
    }
    Message rocketMsg = new Message(topic, tags, payloads);
    if (Objects.nonNull(headers) && !headers.isEmpty()) {

        // 修改部分 --- start ---
        // 默认先获取不带前缀的keys
        Object keys = headers.get(RocketMQHeaders.KEYS);
	// 当获取不到再从headers取带前缀的keys的结果
	if (StringUtils.isEmpty(keys)) {
	    keys = headers.get(toRocketHeaderKey(RocketMQHeaders.KEYS));
	}
	// 修改部分 --- end ---

        if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key
            rocketMsg.setKeys(keys.toString());
        }
        Object flagObj = headers.getOrDefault("FLAG", "0");
        int flag = 0;
        try {
            flag = Integer.parseInt(flagObj.toString());
        } catch (NumberFormatException e) {
            // Ignore it
            if (log.isInfoEnabled()) {
                log.info("flag must be integer, flagObj:{}", flagObj);
            }
        }
        rocketMsg.setFlag(flag);
        Object waitStoreMsgOkObj = headers.getOrDefault("WAIT_STORE_MSG_OK", "true");
        rocketMsg.setWaitStoreMsgOK(Boolean.TRUE.equals(waitStoreMsgOkObj));
        headers.entrySet().stream()
            .filter(entry -> !Objects.equals(entry.getKey(), "FLAG")
                && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "FLAG", "WAIT_STORE_MSG_OK"
            .forEach(entry -> {
                if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
                    rocketMsg.putUserProperty(entry.getKey(), String.valueOf(entry.getValue()));
                }
            });

    }
    return rocketMsg;
}

修改内容就其中中文标注的部分。
补充了一段,当原本逻辑获取不到keys的结果,重新按照带前缀的参数获取一次。
这样能够兼容原本的,而且keys参数能够正常获取。

@RongtongJin
Copy link
Contributor

@RongtongJin 我这边应该没有提交权限的
我改的话,对应方法改改后如下:

private static Message getAndWrapMessage(String destination, MessageHeaders headers, byte[] payloads) {
    if (destination == null || destination.length() < 1) {
        return null;
    }
    if (payloads == null || payloads.length < 1) {
        return null;
    }
    String[] tempArr = destination.split(":", 2);
    String topic = tempArr[0];
    String tags = "";
    if (tempArr.length > 1) {
        tags = tempArr[1];
    }
    Message rocketMsg = new Message(topic, tags, payloads);
    if (Objects.nonNull(headers) && !headers.isEmpty()) {

        // 修改部分 --- start ---
        // 默认先获取不带前缀的keys
        Object keys = headers.get(RocketMQHeaders.KEYS);
	// 当获取不到再从headers取带前缀的keys的结果
	if (StringUtils.isEmpty(keys)) {
	    keys = headers.get(toRocketHeaderKey(RocketMQHeaders.KEYS));
	}
	// 修改部分 --- end ---

        if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key
            rocketMsg.setKeys(keys.toString());
        }
        Object flagObj = headers.getOrDefault("FLAG", "0");
        int flag = 0;
        try {
            flag = Integer.parseInt(flagObj.toString());
        } catch (NumberFormatException e) {
            // Ignore it
            if (log.isInfoEnabled()) {
                log.info("flag must be integer, flagObj:{}", flagObj);
            }
        }
        rocketMsg.setFlag(flag);
        Object waitStoreMsgOkObj = headers.getOrDefault("WAIT_STORE_MSG_OK", "true");
        rocketMsg.setWaitStoreMsgOK(Boolean.TRUE.equals(waitStoreMsgOkObj));
        headers.entrySet().stream()
            .filter(entry -> !Objects.equals(entry.getKey(), "FLAG")
                && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "FLAG", "WAIT_STORE_MSG_OK"
            .forEach(entry -> {
                if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
                    rocketMsg.putUserProperty(entry.getKey(), String.valueOf(entry.getValue()));
                }
            });

    }
    return rocketMsg;
}

修改内容就其中中文标注的部分。
补充了一段,当原本逻辑获取不到keys的结果,重新按照带前缀的参数获取一次。
这样能够兼容原本的,而且keys参数能够正常获取。

我认为你的代码是ok,你可以提交一个pull request,我可以帮你review以及合并

@cj-8480
Copy link
Contributor Author

cj-8480 commented Sep 16, 2020

@RongtongJin OK,已经提交pull request

RongtongJin pushed a commit that referenced this issue Sep 17, 2020
@RongtongJin RongtongJin added the bug Something isn't working label Sep 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants