1_1_0升级指南
1.1.0
版本添加了几个重大的更新,有些无法向下兼容:
原先Wx*MessageInterceptor
和Wx*MessageHandler
的方法签名发生了变化:
从原来的:
public boolean intercept(Wx*XmlMessage wxMessage,
Map<String, Object> context,
Wx*Service wx*Service);
public Wx*XmlOutMessage handle(Wx*XmlMessage wxMessage,
Map<String, Object> context,
Wx*Service wx*Service);
变成了现在的:
public boolean intercept(Wx*XmlMessage wxMessage,
Map<String, Object> context,
WxCpService wxCpService,
WxSessionManager sessionManager);
public Wx*XmlOutMessage handle(Wx*XmlMessage wxMessage,
Map<String, Object> context,
Wx*Service wx*Service,
WxSessionManager sessionManager);
区别就是多了一个WxSessionManager
参数,开发人员可以使用WxSessionManager
来模session,具体用法看
WxSession的使用。
原来Wx*ConfigStorage
的expiresIn
变成了expiresTime
,请各位注意修改代码。
原先Access Token的刷新策略是,假定Access Token是最新的,如果在调用api的时候返回了,错误,才刷新Access Token。也就是说采用的是懒刷新机制:
刷新Access Token的地方在这里:
public <T, E> T execute(RequestExecutor<T, E> executor, String uri, E data) throws WxErrorException {
if (StringUtils.isBlank(wxMpConfigStorage.getAccessToken())) {
accessTokenRefresh();
}
String accessToken = wxMpConfigStorage.getAccessToken();
String uriWithAccessToken = uri;
uriWithAccessToken += uri.indexOf('?') == -1 ? "?access_token=" + accessToken : "&access_token=" + accessToken;
try {
return executor.execute(getHttpclient(), httpProxy, uriWithAccessToken, data);
} catch (WxErrorException e) {
WxError error = e.getError();
/*
* 发生以下情况时尝试刷新access_token
* 40001 获取access_token时AppSecret错误,或者access_token无效
* 42001 access_token超时
*/
if (error.getErrorCode() == 42001 || error.getErrorCode() == 40001) {
accessTokenRefresh();
return execute(executor, uri, data);
}
// ...
} catch (ClientProtocolException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
为了避免多线程并发下同时发起多次刷新Access Token的请求,用了一个AtomicBoolean
标识是否正在刷新,
如果有线程发现正在刷新Access Token,那么就进入一个等待刷新结束的循环,每次等待100ms。也就是说采用的是轮询等待策略:
public void accessTokenRefresh() throws WxErrorException {
if (!GLOBAL_ACCESS_TOKEN_REFRESH_FLAG.getAndSet(true)) {
try {
// ...
try {
// ...
HttpGet httpGet = new HttpGet(url);
// ...
String resultContent = new BasicResponseHandler().handleResponse(response);
WxError error = WxError.fromJson(resultContent);
if (error.getErrorCode() != 0) {
throw new WxErrorException(error);
}
WxAccessToken accessToken = WxAccessToken.fromJson(resultContent);
wxMpConfigStorage.updateAccessToken(accessToken.getAccessToken(), accessToken.getExpiresIn());
} catch (ClientProtocolException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
} finally {
GLOBAL_ACCESS_TOKEN_REFRESH_FLAG.set(false);
}
} else {
// 每隔100ms检查一下是否刷新完毕了
while (GLOBAL_ACCESS_TOKEN_REFRESH_FLAG.get()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
// 刷新完毕了,就没他什么事儿了
}
}
原先方式的问题有两个:
- N+1请求:系统会先发起一次正常的A请求,发现Access Token过期,发起一次刷新请求,然后再次发起A请求,整个请求的次数有3次。
- 效率问题:等待刷新完毕的地方使用了
Thread.sleep(100)
,这样的效率并不高。
把N+1请求改成N请求的解决办法是:
我把Wx*ConfigStorage
里原来基本没用的expiresIn
改成了expiresTime
,
expiresTime
是一个表示Access Token在未来过期的某个时间点所代表的时间戳(毫秒)。
并且添加了isAccessTokenExpired
方法:
public boolean isAccessTokenExpired() {
return System.currentTimeMillis() > this.expiresTime;
}
而在调用微信API前,系统会先去获得Access Token,这里不是简单从configStorage.getAccessToken
了,
而是先查询是否isAccessTokenExpired
,如果没有过期那就直接返回configStorage的Access Token,如果过期了,
那么刷新一下Access Token,然后再进行正常的API调用。
下面代码里的getAccessToken
方法会判断是否过期,如果过期刷新,如果没过期就返回。这样也就避免了多一次请求的问题:
protected <T, E> T executeInternal(RequestExecutor<T, E> executor, String uri, E data) throws WxErrorException {
String accessToken = getAccessToken(false);
String uriWithAccessToken = uri;
uriWithAccessToken += uri.indexOf('?') == -1 ? "?access_token=" + accessToken : "&access_token=" + accessToken;
try {
return executor.execute(getHttpclient(), httpProxy, uriWithAccessToken, data);
} catch (WxErrorException e) {
WxError error = e.getError();
/*
* 发生以下情况时尝试刷新access_token
* 40001 获取access_token时AppSecret错误,或者access_token无效
* 42001 access_token超时
*/
if (error.getErrorCode() == 42001 || error.getErrorCode() == 40001) {
// 强制设置wxMpConfigStorage它的access token过期了,这样在下一次请求里就会刷新access token
wxMpConfigStorage.expireAccessToken();
return execute(executor, uri, data);
}
if (error.getErrorCode() != 0) {
throw new WxErrorException(error);
}
return null;
} catch (ClientProtocolException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
解决效率问题的办法是:用double check lock替代了原先的轮询等待。
public String getAccessToken(boolean forceRefresh) throws WxErrorException {
if (forceRefresh) {
wxMpConfigStorage.expireAccessToken();
}
if (wxMpConfigStorage.isAccessTokenExpired()) {
synchronized (globalAccessTokenRefreshLock) {
if (wxMpConfigStorage.isAccessTokenExpired()) {
String url = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential"
+ "&appid=" + wxMpConfigStorage.getAppId()
+ "&secret=" + wxMpConfigStorage.getSecret()
;
try {
HttpGet httpGet = new HttpGet(url);
if (httpProxy != null) {
RequestConfig config = RequestConfig.custom().setProxy(httpProxy).build();
httpGet.setConfig(config);
}
CloseableHttpClient httpclient = getHttpclient();
CloseableHttpResponse response = httpclient.execute(httpGet);
String resultContent = new BasicResponseHandler().handleResponse(response);
WxError error = WxError.fromJson(resultContent);
if (error.getErrorCode() != 0) {
throw new WxErrorException(error);
}
WxAccessToken accessToken = WxAccessToken.fromJson(resultContent);
wxMpConfigStorage.updateAccessToken(accessToken.getAccessToken(), accessToken.getExpiresIn());
} catch (ClientProtocolException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
return wxMpConfigStorage.getAccessToken();
}
实际上,这次版本引入的js_api_ticket也是采用同样的策略实现的。
微信平台有时候会返回繁忙代码,这个时候系统会自动重试,但是原来的实现有问题,在这里说明一下:
protected final ThreadLocal<Integer> retryTimes = new ThreadLocal<Integer>();
public <T, E> T execute(RequestExecutor<T, E> executor, String uri, E data) throws WxErrorException {
// ...
try {
return executor.execute(getHttpclient(), httpProxy, uriWithAccessToken, data);
} catch (WxErrorException e) {
// ...
/**
* -1 系统繁忙, 1000ms后重试
*/
if (error.getErrorCode() == -1) {
if(retryTimes.get() == null) {
retryTimes.set(0);
}
if (retryTimes.get() > 4) {
retryTimes.set(0);
throw new RuntimeException("微信服务端异常,超出重试次数");
}
int sleepMillis = 1000 * (1 << retryTimes.get());
try {
System.out.println("微信系统繁忙," + sleepMillis + "ms后重试");
Thread.sleep(sleepMillis);
retryTimes.set(retryTimes.get() + 1);
return execute(executor, uri, data);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
}
if (error.getErrorCode() != 0) {
throw new WxErrorException(error);
}
return null;
} catch (ClientProtocolException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
从上面代码可以看到,当微信平台繁忙的时候系统会在当前线程里重试5次,每次重试间隔的时间是 1 * 2 ^ (retryTimes)
秒。
本来这一切都没有什么问题,但是当我在某个版本里使用线程池的时候就有问题了:
public class WxMpMessageRouter {
private final ExecutorService executorService;
/**
* 处理微信消息
* @param wxMessage
*/
public WxMpXmlOutMessage route(final WxMpXmlMessage wxMessage) {
// ...
executorService.submit(new Runnable() {
public void run() {
rule.service(wxMessage);
}
});
// ...
}
}
Wx*Service
里的retryTimes
是一个ThreadLocal
对象,也就是说是绑定到线程上的,
而线城池的线程是回收使用了,这就造成当某个rule在处理消息的时候,所获得的retryTimes
不是清0的。
那么他的重试次数可能就没有5次了。
所以我就把代码改成了这样:
public <T, E> T execute(RequestExecutor<T, E> executor, String uri, E data) throws WxErrorException {
int retryTimes = 0;
do {
try {
return executeInternal(executor, uri, data);
} catch (WxErrorException e) {
WxError error = e.getError();
/**
* -1 系统繁忙, 1000ms后重试
*/
if (error.getErrorCode() == -1) {
int sleepMillis = retrySleepMillis * (1 << retryTimes);
try {
log.debug("微信系统繁忙,{}ms 后重试(第{}次)", sleepMillis, retryTimes + 1);
Thread.sleep(sleepMillis);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
} else {
throw e;
}
}
} while(++retryTimes < maxRetryTimes);
throw new RuntimeException("微信服务端异常,超出重试次数");
}
在这里retryTimes
变成了局部变量,就不会有前面讲的问题了。