Skip to content

Commit

Permalink
Added delegation when connection is already open (#241)
Browse files Browse the repository at this point in the history
* Added delegation when connection is already open

* removed default WD ugi
  • Loading branch information
Patrick Duin committed May 18, 2022
1 parent 381b371 commit 89ba026
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 25 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## [3.10.4] - 2022-04-17
### Fixes
* More tuning of delayed `set_ugi` calls.

## [3.10.3] - 2022-04-16
### Fixes
* Potential exception when `set_ugi` has immutable list or null-value groups argument.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private static class ReconnectingMetastoreClientInvocationHandler implements Inv
private final String name;
private final int maxRetries;

private HiveUgiArgs cachedUgi = HiveUgiArgs.WAGGLE_DANCE_DEFAULT;
private HiveUgiArgs cachedUgi = null;

private ReconnectingMetastoreClientInvocationHandler(
String name,
Expand Down Expand Up @@ -77,29 +77,40 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
String user = (String) args[0];
List<String> groups = (List<String>) args[1];
cachedUgi = new HiveUgiArgs(user, groups);
return Lists.newArrayList(user);
if (base.isOpen()) {
LOG
.info("calling #set_ugi (on already open client) for user '{}', on metastore {}", cachedUgi.getUser(),
name);
return doRealCall(method, args, attempt);
} else {
// delay call until we get the next non set_ugi call, this helps doing unnecessary calls to Federated
// Metastores.
return Lists.newArrayList(user);
}
default:
base.open(cachedUgi);
do {
try {
return method.invoke(base.getClient(), args);
} catch (InvocationTargetException e) {
Throwable realException = e.getTargetException();
if (TTransportException.class.isAssignableFrom(realException.getClass())) {
if (attempt < maxRetries && shouldRetry(method)) {
LOG.debug("TTransportException captured in client {}. Reconnecting... ", name);
base.reconnect(cachedUgi);
continue;
}
throw new MetastoreUnavailableException("Client " + name + " is not available", realException);
return doRealCall(method, args, attempt);
}
}

private Object doRealCall(Method method, Object[] args, int attempt) throws IllegalAccessException, Throwable {
do {
try {
return method.invoke(base.getClient(), args);
} catch (InvocationTargetException e) {
Throwable realException = e.getTargetException();
if (TTransportException.class.isAssignableFrom(realException.getClass())) {
if (attempt < maxRetries && shouldRetry(method)) {
LOG.debug("TTransportException captured in client {}. Reconnecting... ", name);
base.reconnect(cachedUgi);
continue;
}
throw realException;
throw new MetastoreUnavailableException("Client " + name + " is not available", realException);
}
} while (++attempt <= maxRetries);
break;
}
throw realException;
}
} while (++attempt <= maxRetries);
throw new RuntimeException("Unreachable code");

}

private boolean shouldRetry(Method method) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,12 @@ void open(HiveUgiArgs ugiArgs) {
+ CONN_COUNT.incrementAndGet());

isConnected = true;
LOG.info("calling #set_ugi for user '{}', on URI {}", ugiArgs.getUser(), store);
client.set_ugi(ugiArgs.getUser(), ugiArgs.getGroups());
if (ugiArgs != null) {
LOG.info("calling #set_ugi for user '{}', on URI {}", ugiArgs.getUser(), store);
client.set_ugi(ugiArgs.getUser(), ugiArgs.getGroups());
} else {
LOG.debug("Connection opened with out #set_ugi call', on URI {}", store);
}
} catch (TException e) {
te = e;
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import static com.hotels.bdp.waggledance.client.HiveUgiArgs.WAGGLE_DANCE_DEFAULT;
import static com.hotels.bdp.waggledance.client.HiveUgiArgsStub.TEST_ARGS;

import java.util.List;
Expand Down Expand Up @@ -67,7 +66,7 @@ public void isOpenWithReconnection() {

boolean result = iface.isOpen();
assertThat(result, is(true));
verify(base).reconnect(WAGGLE_DANCE_DEFAULT);
verify(base).reconnect(null);
}

@Test
Expand Down Expand Up @@ -108,8 +107,8 @@ public void defaultMethodCallThrowsTransportExceptionRetries() throws TException

String result = iface.getName();
assertThat(result, is("ourName"));
verify(base).open(WAGGLE_DANCE_DEFAULT);
verify(base).reconnect(WAGGLE_DANCE_DEFAULT);
verify(base).open(null);
verify(base).reconnect(null);
}

@Test
Expand All @@ -127,6 +126,29 @@ public void set_ugi_before_call() throws Exception {
verify(base).reconnect(TEST_ARGS);
}

@Test
public void set_ugi_CachedWhenClosed() throws Exception {
when(base.isOpen()).thenReturn(false);

CloseableThriftHiveMetastoreIface iface = factory.newInstance("name", RECONNECTION_RETRIES, base);
List<String> setUgiResult = iface.set_ugi(TEST_ARGS.getUser(), TEST_ARGS.getGroups());
assertThat(setUgiResult, is(Lists.newArrayList(TEST_ARGS.getUser())));

verify(base, never()).open(TEST_ARGS);
verify(base, never()).reconnect(TEST_ARGS);
}

@Test
public void set_ugi_CalledWhenOpen() throws Exception {
when(base.getClient()).thenReturn(client);
when(base.isOpen()).thenReturn(true);
when(client.set_ugi(TEST_ARGS.getUser(), TEST_ARGS.getGroups())).thenReturn(Lists.newArrayList("users!"));

CloseableThriftHiveMetastoreIface iface = factory.newInstance("name", RECONNECTION_RETRIES, base);
List<String> setUgiResult = iface.set_ugi(TEST_ARGS.getUser(), TEST_ARGS.getGroups());
assertThat(setUgiResult, is(Lists.newArrayList("users!")));
}

@Test(expected = MetastoreUnavailableException.class)
public void shutdownThrowsTransportExceptionNoRetry() throws TException {
when(base.getClient()).thenReturn(client);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright (C) 2016-2022 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.bdp.waggledance.client;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down

0 comments on commit 89ba026

Please sign in to comment.