/
TransProducer.java
139 lines (124 loc) · 6.97 KB
/
TransProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQTransProducer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.List;
public class TransProducer {
static void processCommitRollError(Throwable e) {
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Commit/Roll transaction error, requestId is:" + errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
}
}
public static void main(String[] args) throws Throwable {
MQClient mqClient = new MQClient(
// 设置HTTP接入域名(此处以公共云生产环境为例)
"${HTTP_ENDPOINT}",
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${ACCESS_KEY}",
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${SECRET_KEY}"
);
// 所属的 Topic
final String topic = "${TOPIC}";
// Topic所属实例ID,默认实例为空
final String instanceId = "${INSTANCE_ID}";
// 您在控制台创建的 Consumer ID(Group ID)
final String groupId = "${GROUP_ID}";
final MQTransProducer mqTransProducer = mqClient.getTransProducer(instanceId, topic, groupId);
for (int i = 0; i < 4; i++) {
TopicMessage topicMessage = new TopicMessage();
topicMessage.setMessageBody("trans_msg");
topicMessage.setMessageTag("a");
topicMessage.setMessageKey(String.valueOf(System.currentTimeMillis()));
// 设置事务第一次回查的时间,为相对时间,单位:秒,范围为10~300s之间
// 第一次事务回查后如果消息没有commit或者rollback,则之后每隔10s左右会回查一次,总共回查一天
topicMessage.setTransCheckImmunityTime(10);
topicMessage.getProperties().put("a", String.valueOf(i));
TopicMessage pubResultMsg = null;
pubResultMsg = mqTransProducer.publishMessage(topicMessage);
System.out.println("Send---->msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5()
+ ", Handle: " + pubResultMsg.getReceiptHandle()
);
if (pubResultMsg != null && pubResultMsg.getReceiptHandle() != null) {
if (i == 0) {
// 发送完事务消息后能获取到半消息句柄,可以直接commit/rollback事务消息
try {
mqTransProducer.commit(pubResultMsg.getReceiptHandle());
System.out.println(String.format("MessageId:%s, commit", pubResultMsg.getMessageId()));
} catch (Throwable e) {
// 如果Commit/Rollback时超过了TransCheckImmunityTime则会失败
if (e instanceof AckMessageException) {
processCommitRollError(e);
continue;
}
}
}
}
}
// 客户端需要有一个线程或者进程来消费没有确认的事务消息
// 示例这里启动一个线程来检查没有确认的事务消息
Thread t = new Thread(new Runnable() {
public void run() {
int count = 0;
while(true) {
try {
if (count == 3) {
break;
}
List<Message> messages = mqTransProducer.consumeHalfMessage(3, 3);
if (messages == null) {
System.out.println("No Half message!");
continue;
}
System.out.println(String.format("Half---->MessageId:%s,Properties:%s,Body:%s,Latency:%d",
messages.get(0).getMessageId(),
messages.get(0).getProperties(),
messages.get(0).getMessageBodyString(),
System.currentTimeMillis() - messages.get(0).getPublishTime()));
for (Message message : messages) {
try {
if (Integer.valueOf(message.getProperties().get("a")) == 1) {
// 确认提交事务消息
mqTransProducer.commit(message.getReceiptHandle());
count++;
System.out.println(String.format("MessageId:%s, commit", message.getMessageId()));
} else if (Integer.valueOf(message.getProperties().get("a")) == 2
&& message.getConsumedTimes() > 1) {
// 确认提交事务消息
mqTransProducer.commit(message.getReceiptHandle());
count++;
System.out.println(String.format("MessageId:%s, commit", message.getMessageId()));
} else if (Integer.valueOf(message.getProperties().get("a")) == 3) {
// 确认回滚事务消息
mqTransProducer.rollback(message.getReceiptHandle());
count++;
System.out.println(String.format("MessageId:%s, rollback", message.getMessageId()));
} else {
// 什么都不做,下次再检查
System.out.println(String.format("MessageId:%s, unknown", message.getMessageId()));
}
} catch (Throwable e) {
// 如果Commit/Rollback时超过了TransCheckImmunityTime(针对发送事务消息的句柄)或者超过10s(针对consumeHalfMessage的句柄)则会失败
processCommitRollError(e);
}
}
} catch (Throwable e) {
System.out.println(e.getMessage());
}
}
}
});
t.start();
t.join();
mqClient.close();
}
}