-
Notifications
You must be signed in to change notification settings - Fork 1
/
RetryingMetaStoreClient.java
279 lines (254 loc) · 12 KB
/
RetryingMetaStoreClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.metastore;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* RetryingMetaStoreClient. Creates a proxy for a IMetaStoreClient
* implementation and retries calls to it on failure.
* If the login user is authenticated using keytab, it relogins user before
* each call.
*
*/
@Public
public class RetryingMetaStoreClient implements InvocationHandler {
private static final Logger LOG = LoggerFactory.getLogger(RetryingMetaStoreClient.class.getName());
private final IMetaStoreClient base;
private final int retryLimit;
private final long retryDelaySeconds;
private final ConcurrentHashMap<String, Long> metaCallTimeMap;
private final long connectionLifeTimeInMillis;
private long lastConnectionTime;
private boolean localMetaStore;
protected RetryingMetaStoreClient(HiveConf hiveConf, Class<?>[] constructorArgTypes,
Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap,
Class<? extends IMetaStoreClient> msClientClass) throws MetaException {
// 连接metastore失败的重试次数
this.retryLimit = hiveConf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES); // 默认值为1
// 客户端在连续连接尝试之间等待的秒数, 默认值为1s
this.retryDelaySeconds = hiveConf.getTimeVar(
HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
this.metaCallTimeMap = metaCallTimeMap;
// MetaStore客户端套接字寿命(以秒为单位)。 超过这一时间后,客户端将在下一次MetaStore操作中重新连接。 值为0表示连接具有无限寿命。
this.connectionLifeTimeInMillis = hiveConf.getTimeVar(
HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME, TimeUnit.MILLISECONDS); // 默认值为0s
// 将当前时间设置为上次连接时间
this.lastConnectionTime = System.currentTimeMillis();
String msUri = hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS);
// 从这句代码可以看出,如果localMetaStore=true, 则msUri需为null或为""
localMetaStore = (msUri == null) || msUri.trim().isEmpty();
reloginExpiringKeytabUser();
// 创建IMetaStoreClient对象, msClientClass为SessionHiveMetaStoreClient, 调用SessionHiveMetaStoreClient的构造方法, 实际又调用的父类HiveMetaStoreClient的构造方法
// HiveMetaStoreClient的构造方法中会实例化client, 建立到metastore的连接, 即base实际是SessionHiveMetaStoreClient对象
this.base = (IMetaStoreClient) MetaStoreUtils.newInstance(
msClientClass, constructorArgTypes, constructorArgs);
}
public static IMetaStoreClient getProxy(
HiveConf hiveConf, boolean allowEmbedded) throws MetaException {
return getProxy(hiveConf, new Class[]{HiveConf.class, Boolean.class},
new Object[]{hiveConf, allowEmbedded}, null, HiveMetaStoreClient.class.getName()
);
}
@VisibleForTesting
public static IMetaStoreClient getProxy(HiveConf hiveConf, HiveMetaHookLoader hookLoader,
String mscClassName) throws MetaException {
return getProxy(hiveConf, hookLoader, null, mscClassName, true);
}
// Hive中调用该方法, 传入的mscClassName是: SessionHiveMetaStoreClient
public static IMetaStoreClient getProxy(HiveConf hiveConf, HiveMetaHookLoader hookLoader,
ConcurrentHashMap<String, Long> metaCallTimeMap, String mscClassName, boolean allowEmbedded)
throws MetaException {
return getProxy(hiveConf,
new Class[] {HiveConf.class, HiveMetaHookLoader.class, Boolean.class},
new Object[] {hiveConf, hookLoader, allowEmbedded},
metaCallTimeMap,
mscClassName
);
}
/**
* This constructor is meant for Hive internal use only.
* Please use getProxy(HiveConf hiveConf, HiveMetaHookLoader hookLoader) for external purpose.
*/
public static IMetaStoreClient getProxy(HiveConf hiveConf, Class<?>[] constructorArgTypes,
Object[] constructorArgs, String mscClassName) throws MetaException {
return getProxy(hiveConf, constructorArgTypes, constructorArgs, null, mscClassName);
}
/**
* This constructor is meant for Hive internal use only.
* Please use getProxy(HiveConf hiveConf, HiveMetaHookLoader hookLoader) for external purpose.
*/
public static IMetaStoreClient getProxy(HiveConf hiveConf, Class<?>[] constructorArgTypes,
Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap,
String mscClassName) throws MetaException {
// 通过反射构造SessionHiveMetaStore对应的class对象
@SuppressWarnings("unchecked")
Class<? extends IMetaStoreClient> baseClass =
(Class<? extends IMetaStoreClient>)MetaStoreUtils.getClass(mscClassName);
// 构造一个RetryingMetaStoreClient对象(该类继承自InvocationHandler),
// 构造方法中会创建一个IMetaStoreClient对象(对象名为base, invoke中使用该base), 是使用上面的baseClass调用newInstance()方法创建的SessionHiveMetaStoreClient对象
RetryingMetaStoreClient handler =
new RetryingMetaStoreClient(hiveConf, constructorArgTypes, constructorArgs,
metaCallTimeMap, baseClass);
// 通过动态代理的方式获取IMetaStoreClient的一个代理类, 而使用该代理类的时候会调用handler(RetryingMetaStoreClient类)的invoke方法, 实际还是使用的handler实例的base对象(一个IMetaStoreClient对象)
return (IMetaStoreClient) Proxy.newProxyInstance(
RetryingMetaStoreClient.class.getClassLoader(), baseClass.getInterfaces(), handler);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object ret = null;
int retriesMade = 0;
TException caughtException = null;
while (true) {
try {
reloginExpiringKeytabUser();
// connectionLifeTimeInMillis默认是0, hasConnectionLifeTimeReached从而返回false
// 如果连接失败, retriesMade会大于0, 这时会执行if中的逻辑, 默认可以重新连接一次
if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) {
base.reconnect();
lastConnectionTime = System.currentTimeMillis();
}
if (metaCallTimeMap == null) {
ret = method.invoke(base, args);
} else {
// need to capture the timing
long startTime = System.currentTimeMillis();
ret = method.invoke(base, args);
long timeTaken = System.currentTimeMillis() - startTime;
addMethodTime(method, timeTaken);
}
break;
} catch (UndeclaredThrowableException e) {
throw e.getCause();
} catch (InvocationTargetException e) {
Throwable t = e.getCause();
if (t instanceof TApplicationException) {
TApplicationException tae = (TApplicationException)t;
switch (tae.getType()) {
case TApplicationException.UNSUPPORTED_CLIENT_TYPE:
case TApplicationException.UNKNOWN_METHOD:
case TApplicationException.WRONG_METHOD_NAME:
case TApplicationException.INVALID_PROTOCOL:
throw t;
default:
// TODO: most other options are probably unrecoverable... throw?
caughtException = tae;
}
} else if ((t instanceof TProtocolException) || (t instanceof TTransportException)) {
// TODO: most protocol exceptions are probably unrecoverable... throw?
caughtException = (TException)t;
} else if ((t instanceof MetaException) && t.getMessage().matches(
"(?s).*(JDO[a-zA-Z]*|TProtocol|TTransport)Exception.*") &&
!t.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) {
caughtException = (MetaException)t;
} else {
throw t;
}
} catch (MetaException e) {
if (e.getMessage().matches("(?s).*(IO|TTransport)Exception.*") &&
!e.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) {
caughtException = e;
} else {
throw e;
}
}
// retryLimit默认值为1
if (retriesMade >= retryLimit) {
throw caughtException;
}
retriesMade++;
LOG.warn("MetaStoreClient lost connection. Attempting to reconnect.",
caughtException);
Thread.sleep(retryDelaySeconds * 1000);
}
return ret;
}
private void addMethodTime(Method method, long timeTaken) {
String methodStr = getMethodString(method);
while (true) {
Long curTime = metaCallTimeMap.get(methodStr), newTime = timeTaken;
if (curTime != null && metaCallTimeMap.replace(methodStr, curTime, newTime + curTime)) break;
if (curTime == null && (null == metaCallTimeMap.putIfAbsent(methodStr, newTime))) break;
}
}
/**
* @param method
* @return String representation with arg types. eg getDatabase_(String, )
*/
private String getMethodString(Method method) {
StringBuilder methodSb = new StringBuilder(method.getName());
methodSb.append("_(");
for (Class<?> paramClass : method.getParameterTypes()) {
methodSb.append(paramClass.getSimpleName());
methodSb.append(", ");
}
methodSb.append(")");
return methodSb.toString();
}
private boolean hasConnectionLifeTimeReached(Method method) {
if (connectionLifeTimeInMillis <= 0 || localMetaStore ||
method.getName().equalsIgnoreCase("close")) {
return false;
}
boolean shouldReconnect =
(System.currentTimeMillis() - lastConnectionTime) >= connectionLifeTimeInMillis;
if (LOG.isDebugEnabled()) {
LOG.debug("Reconnection status for Method: " + method.getName() + " is " + shouldReconnect);
}
return shouldReconnect;
}
/**
* Relogin if login user is logged in using keytab
* Relogin is actually done by ugi code only if sufficient time has passed
* A no-op if kerberos security is not enabled
* @throws MetaException
*/
private void reloginExpiringKeytabUser() throws MetaException {
if(!UserGroupInformation.isSecurityEnabled()){
return;
}
try {
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
//checkTGT calls ugi.relogin only after checking if it is close to tgt expiry
//hadoop relogin is actually done only every x minutes (x=10 in hadoop 1.x)
if(ugi.isFromKeytab()){
ugi.checkTGTAndReloginFromKeytab();
}
} catch (IOException e) {
String msg = "Error doing relogin using keytab " + e.getMessage();
LOG.error(msg, e);
throw new MetaException(msg);
}
}
}