/
PayNotifyServiceImpl.java
294 lines (265 loc) · 12 KB
/
PayNotifyServiceImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
package cn.iocoder.yudao.module.pay.service.notify;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.date.DateUtils;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.module.pay.api.notify.dto.PayOrderNotifyReqDTO;
import cn.iocoder.yudao.module.pay.api.notify.dto.PayRefundNotifyReqDTO;
import cn.iocoder.yudao.module.pay.controller.admin.notify.vo.PayNotifyTaskPageReqVO;
import cn.iocoder.yudao.module.pay.dal.dataobject.notify.PayNotifyLogDO;
import cn.iocoder.yudao.module.pay.dal.dataobject.notify.PayNotifyTaskDO;
import cn.iocoder.yudao.module.pay.dal.dataobject.order.PayOrderDO;
import cn.iocoder.yudao.module.pay.dal.dataobject.refund.PayRefundDO;
import cn.iocoder.yudao.module.pay.dal.mysql.notify.PayNotifyLogMapper;
import cn.iocoder.yudao.module.pay.dal.mysql.notify.PayNotifyTaskMapper;
import cn.iocoder.yudao.module.pay.dal.redis.notify.PayNotifyLockRedisDAO;
import cn.iocoder.yudao.module.pay.enums.notify.PayNotifyStatusEnum;
import cn.iocoder.yudao.module.pay.enums.notify.PayNotifyTypeEnum;
import cn.iocoder.yudao.module.pay.service.order.PayOrderService;
import cn.iocoder.yudao.module.pay.service.refund.PayRefundService;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.Resource;
import javax.validation.Valid;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static cn.iocoder.yudao.framework.common.util.date.LocalDateTimeUtils.addTime;
import static cn.iocoder.yudao.module.pay.framework.job.config.PayJobConfiguration.NOTIFY_THREAD_POOL_TASK_EXECUTOR;
/**
* 支付通知 Core Service 实现类
*
* @author 芋道源码
*/
@Service
@Valid
@Slf4j
public class PayNotifyServiceImpl implements PayNotifyService {
/**
* 通知超时时间,单位:秒
*/
public static final int NOTIFY_TIMEOUT = 120;
/**
* {@link #NOTIFY_TIMEOUT} 的毫秒
*/
public static final long NOTIFY_TIMEOUT_MILLIS = 120 * DateUtils.SECOND_MILLIS;
@Resource
@Lazy // 循环依赖,避免报错
private PayOrderService orderService;
@Resource
@Lazy // 循环依赖,避免报错
private PayRefundService refundService;
@Resource
private PayNotifyTaskMapper notifyTaskMapper;
@Resource
private PayNotifyLogMapper notifyLogMapper;
@Resource(name = NOTIFY_THREAD_POOL_TASK_EXECUTOR)
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Resource
private PayNotifyLockRedisDAO notifyLockCoreRedisDAO;
@Override
@Transactional(rollbackFor = Exception.class)
public void createPayNotifyTask(Integer type, Long dataId) {
PayNotifyTaskDO task = new PayNotifyTaskDO().setType(type).setDataId(dataId);
task.setStatus(PayNotifyStatusEnum.WAITING.getStatus()).setNextNotifyTime(LocalDateTime.now())
.setNotifyTimes(0).setMaxNotifyTimes(PayNotifyTaskDO.NOTIFY_FREQUENCY.length + 1);
// 补充 appId + notifyUrl 字段
if (Objects.equals(task.getType(), PayNotifyTypeEnum.ORDER.getType())) {
PayOrderDO order = orderService.getOrder(task.getDataId()); // 不进行非空判断,有问题直接异常
task.setAppId(order.getAppId()).
setMerchantOrderId(order.getMerchantOrderId()).setNotifyUrl(order.getNotifyUrl());
} else if (Objects.equals(task.getType(), PayNotifyTypeEnum.REFUND.getType())) {
PayRefundDO refundDO = refundService.getRefund(task.getDataId());
task.setAppId(refundDO.getAppId())
.setMerchantOrderId(refundDO.getMerchantOrderId()).setNotifyUrl(refundDO.getNotifyUrl());
}
// 执行插入
notifyTaskMapper.insert(task);
// 必须在事务提交后,在发起任务,否则 PayNotifyTaskDO 还没入库,就提前回调接入的业务
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
executeNotify(task);
}
});
}
@Override
public int executeNotify() throws InterruptedException {
// 获得需要通知的任务
List<PayNotifyTaskDO> tasks = notifyTaskMapper.selectListByNotify();
if (CollUtil.isEmpty(tasks)) {
return 0;
}
// 遍历,逐个通知
CountDownLatch latch = new CountDownLatch(tasks.size());
tasks.forEach(task -> threadPoolTaskExecutor.execute(() -> {
try {
executeNotify(task);
} finally {
latch.countDown();
}
}));
// 等待完成
awaitExecuteNotify(latch);
// 返回执行完成的任务数(成功 + 失败)
return tasks.size();
}
/**
* 等待全部支付通知的完成
* 每 1 秒会打印一次剩余任务数量
*
* @param latch Latch
* @throws InterruptedException 如果被打断
*/
private void awaitExecuteNotify(CountDownLatch latch) throws InterruptedException {
long size = latch.getCount();
for (int i = 0; i < NOTIFY_TIMEOUT; i++) {
if (latch.await(1L, TimeUnit.SECONDS)) {
return;
}
log.info("[awaitExecuteNotify][任务处理中, 总任务数({}) 剩余任务数({})]", size, latch.getCount());
}
log.error("[awaitExecuteNotify][任务未处理完,总任务数({}) 剩余任务数({})]", size, latch.getCount());
}
/**
* 同步执行单个支付通知
*
* @param task 通知任务
*/
public void executeNotify(PayNotifyTaskDO task) {
// 分布式锁,避免并发问题
notifyLockCoreRedisDAO.lock(task.getId(), NOTIFY_TIMEOUT_MILLIS, () -> {
// 校验,当前任务是否已经被通知过
// 虽然已经通过分布式加锁,但是可能同时满足通知的条件,然后都去获得锁。此时,第一个执行完后,第二个还是能拿到锁,然后会再执行一次。
// 因此,此处我们通过第 notifyTimes 通知次数是否匹配来判断
PayNotifyTaskDO dbTask = notifyTaskMapper.selectById(task.getId());
if (ObjectUtil.notEqual(task.getNotifyTimes(), dbTask.getNotifyTimes())) {
log.warn("[executeNotifySync][task({}) 任务被忽略,原因是它的通知不是第 ({}) 次,可能是因为并发执行了]",
JsonUtils.toJsonString(task), dbTask.getNotifyTimes());
return;
}
// 执行通知
getSelf().executeNotify0(dbTask);
});
}
@Transactional(rollbackFor = Exception.class)
public void executeNotify0(PayNotifyTaskDO task) {
// 发起回调
CommonResult<?> invokeResult = null;
Throwable invokeException = null;
try {
invokeResult = executeNotifyInvoke(task);
} catch (Throwable e) {
invokeException = e;
}
// 处理结果
Integer newStatus = processNotifyResult(task, invokeResult, invokeException);
// 记录 PayNotifyLog 日志
String response = invokeException != null ? ExceptionUtil.getRootCauseMessage(invokeException) :
JsonUtils.toJsonString(invokeResult);
notifyLogMapper.insert(PayNotifyLogDO.builder().taskId(task.getId())
.notifyTimes(task.getNotifyTimes() + 1).status(newStatus).response(response).build());
}
/**
* 执行单个支付任务的 HTTP 调用
*
* @param task 通知任务
* @return HTTP 响应
*/
private CommonResult<?> executeNotifyInvoke(PayNotifyTaskDO task) {
// 拼接 body 参数
Object request;
if (Objects.equals(task.getType(), PayNotifyTypeEnum.ORDER.getType())) {
request = PayOrderNotifyReqDTO.builder().merchantOrderId(task.getMerchantOrderId())
.payOrderId(task.getDataId()).build();
} else if (Objects.equals(task.getType(), PayNotifyTypeEnum.REFUND.getType())) {
request = PayRefundNotifyReqDTO.builder().merchantOrderId(task.getMerchantOrderId())
.payRefundId(task.getDataId()).build();
} else {
throw new RuntimeException("未知的通知任务类型:" + JsonUtils.toJsonString(task));
}
// 拼接 header 参数
Map<String, String> headers = new HashMap<>();
TenantUtils.addTenantHeader(headers, task.getTenantId());
// 发起请求
try (HttpResponse response = HttpUtil.createPost(task.getNotifyUrl())
.body(JsonUtils.toJsonString(request)).addHeaders(headers)
.timeout((int) NOTIFY_TIMEOUT_MILLIS).execute()) {
// 解析结果
return JsonUtils.parseObject(response.body(), CommonResult.class);
}
}
/**
* 处理并更新通知结果
*
* @param task 通知任务
* @param invokeResult 通知结果
* @param invokeException 通知异常
* @return 最终任务的状态
*/
@VisibleForTesting
Integer processNotifyResult(PayNotifyTaskDO task, CommonResult<?> invokeResult, Throwable invokeException) {
// 设置通用的更新 PayNotifyTaskDO 的字段
PayNotifyTaskDO updateTask = new PayNotifyTaskDO()
.setId(task.getId())
.setLastExecuteTime(LocalDateTime.now())
.setNotifyTimes(task.getNotifyTimes() + 1);
// 情况一:调用成功
if (invokeResult != null && invokeResult.isSuccess()) {
updateTask.setStatus(PayNotifyStatusEnum.SUCCESS.getStatus());
notifyTaskMapper.updateById(updateTask);
return updateTask.getStatus();
}
// 情况二:调用失败、调用异常
// 2.1 超过最大回调次数
if (updateTask.getNotifyTimes() >= PayNotifyTaskDO.NOTIFY_FREQUENCY.length) {
updateTask.setStatus(PayNotifyStatusEnum.FAILURE.getStatus());
notifyTaskMapper.updateById(updateTask);
return updateTask.getStatus();
}
// 2.2 未超过最大回调次数
updateTask.setNextNotifyTime(addTime(Duration.ofSeconds(PayNotifyTaskDO.NOTIFY_FREQUENCY[updateTask.getNotifyTimes()])));
updateTask.setStatus(invokeException != null ? PayNotifyStatusEnum.REQUEST_FAILURE.getStatus()
: PayNotifyStatusEnum.REQUEST_SUCCESS.getStatus());
notifyTaskMapper.updateById(updateTask);
return updateTask.getStatus();
}
@Override
public PayNotifyTaskDO getNotifyTask(Long id) {
return notifyTaskMapper.selectById(id);
}
@Override
public PageResult<PayNotifyTaskDO> getNotifyTaskPage(PayNotifyTaskPageReqVO pageReqVO) {
return notifyTaskMapper.selectPage(pageReqVO);
}
@Override
public List<PayNotifyLogDO> getNotifyLogList(Long taskId) {
return notifyLogMapper.selectListByTaskId(taskId);
}
/**
* 获得自身的代理对象,解决 AOP 生效问题
*
* @return 自己
*/
private PayNotifyServiceImpl getSelf() {
return SpringUtil.getBean(getClass());
}
}