-
Notifications
You must be signed in to change notification settings - Fork 302
/
MQConsumer.java
141 lines (117 loc) · 5.33 KB
/
MQConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.liushaoming.jseckill.backend.mq;
import com.alibaba.fastjson.JSON;
import com.liushaoming.jseckill.backend.bean.MQConfigBean;
import com.liushaoming.jseckill.backend.constant.RedisKey;
import com.liushaoming.jseckill.backend.dto.SeckillMsgBody;
import com.liushaoming.jseckill.backend.enums.AckAction;
import com.liushaoming.jseckill.backend.enums.SeckillStateEnum;
import com.liushaoming.jseckill.backend.exception.SeckillException;
import com.liushaoming.jseckill.backend.service.SeckillService;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import javax.annotation.Resource;
import java.io.IOException;
@Component
public class MQConsumer {
private static final Logger logger = LoggerFactory.getLogger(MQConsumer.class);
@Resource
private SeckillService seckillService;
@Resource(name = "mqConnectionReceive")
private Connection mqConnectionReceive;
@Resource(name = "initJedisPool")
private JedisPool jedisPool;
@Autowired
private MQConfigBean mqConfigBean;
public void receive() {
Channel channel = null;
try {
channel = mqConnectionReceive.createChannel();
channel.queueDeclare(mqConfigBean.getQueue(), true, false, false, null);
channel.basicQos(0, 1, false);
} catch (IOException e) {
e.printStackTrace();
}
MyDefaultConsumer myDefaultConsumer = new MyDefaultConsumer(channel);
try {
channel.basicConsume(mqConfigBean.getQueue(), false, myDefaultConsumer);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
private class MyDefaultConsumer extends DefaultConsumer {
private Channel channel;
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
public MyDefaultConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
long threadId1 = Thread.currentThread().getId();
logger.info("---receive_threadId_1={}", threadId1);
String msg = new String(body, "UTF-8");
logger.info("[mqReceive] '" + msg + "'");
SeckillMsgBody msgBody = JSON.parseObject(msg, SeckillMsgBody.class);
AckAction ackAction = AckAction.ACCEPT;
try {
// 这里演延时2秒,模式秒杀的耗时操作, 上线的时候需要注释掉
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// logger.error(e.getMessage(), e);
// }
seckillService.handleInRedis(msgBody.getSeckillId(), msgBody.getUserPhone());
ackAction = AckAction.ACCEPT;
} catch (SeckillException seckillE) {
if (seckillE.getSeckillStateEnum() == SeckillStateEnum.SOLD_OUT
|| seckillE.getSeckillStateEnum() == SeckillStateEnum.REPEAT_KILL) {
// 已售罄,或者此人之前已经秒杀过的
ackAction = AckAction.THROW;
} else {
logger.error(seckillE.getMessage(), seckillE);
logger.info("---->NACK--error_requeue!!!");
ackAction = AckAction.RETRY;
}
} finally {
logger.info("------processIt----");
switch (ackAction) {
case ACCEPT:
try {
logger.info("---->ACK");
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (IOException ioE) {
logger.info("---------basicAck_throws_IOException----------");
logger.error(ioE.getMessage(), ioE);
throw ioE;
}
Jedis jedis = jedisPool.getResource();
jedis.srem(RedisKey.QUEUE_PRE_SECKILL, msgBody.getSeckillId() + "@" + msgBody.getUserPhone());
jedis.close();
break;
case THROW:
logger.info("--LET_MQ_ACK REASON:SeckillStateEnum.SOLD_OUT,SeckillStateEnum.REPEAT_KILL");
channel.basicAck(envelope.getDeliveryTag(), false);
Jedis jedis1 = jedisPool.getResource();
jedis1.srem(RedisKey.QUEUE_PRE_SECKILL, msgBody.getSeckillId() + "@" + msgBody.getUserPhone());
jedis1.close();
break;
case RETRY:
logger.info("---->NACK--error_requeue!!!");
channel.basicNack(envelope.getDeliveryTag(), false, true);
break;
}
}
}
}
}