From 3d7e874a7d522b0bb398b5848d3f6e941df0b970 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Thu, 3 Dec 2015 01:53:27 -0500 Subject: [PATCH] ACCUMULO-4069 Ensure Kerberos relogins occur on server side and client side. --- .../apache/accumulo/core/conf/Property.java | 2 + .../apache/accumulo/core/rpc/ThriftUtil.java | 60 ++++++ .../server/security/SecurityUtil.java | 47 ++++- .../apache/accumulo/harness/TestingKdc.java | 13 +- .../test/functional/KerberosRenewalIT.java | 188 ++++++++++++++++++ 5 files changed, 303 insertions(+), 7 deletions(-) create mode 100644 test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index df53791937d..d8165b773dc 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -177,6 +177,8 @@ public enum Property { GENERAL_KERBEROS_KEYTAB("general.kerberos.keytab", "", PropertyType.PATH, "Path to the kerberos keytab to use. Leave blank if not using kerberoized hdfs"), GENERAL_KERBEROS_PRINCIPAL("general.kerberos.principal", "", PropertyType.STRING, "Name of the kerberos principal to use. _HOST will automatically be " + "replaced by the machines hostname in the hostname portion of the principal. Leave blank if not using kerberoized hdfs"), + GENERAL_KERBEROS_RENEWAL_PERIOD("general.kerberos.renewal.period", "30s", PropertyType.TIMEDURATION, "The amount of time between attempts to perform " + + "Kerberos ticket renewals. This does not equate to how often tickets are actually renewed (which is performed at 80% of the ticket lifetime)."), GENERAL_MAX_MESSAGE_SIZE("general.server.message.size.max", "1G", PropertyType.MEMORY, "The maximum size of a message that can be sent to a server."), GENERAL_SIMPLETIMER_THREADPOOL_SIZE("general.server.simpletimer.threadpool.size", "1", PropertyType.COUNT, "The number of threads to use for " + "server-internal scheduled tasks"), diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java index 1b5c25613f2..000bca1701f 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java @@ -22,6 +22,7 @@ import java.security.KeyStore; import java.util.HashMap; import java.util.Map; +import java.util.Random; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; @@ -62,6 +63,9 @@ public class ThriftUtil { public static final String GSSAPI = "GSSAPI", DIGEST_MD5 = "DIGEST-MD5"; + private static final Random SASL_BACKOFF_RAND = new Random(); + private static final int RELOGIN_MAX_BACKOFF = 5000; + /** * An instance of {@link TraceProtocolFactory} * @@ -296,6 +300,15 @@ public static TTransport createClientTransport(HostAndPort address, int timeout, // Open the transport transport.open(); + } catch (TTransportException e) { + log.warn("Failed to open SASL transport", e); + + // We might have had a valid ticket, but it expired. We'll let the caller retry, but we will attempt to re-login to make the next attempt work. + // Sadly, we have no way to determine the actual reason we got this TTransportException other than inspecting the exception msg. + log.debug("Caught TTransportException opening SASL transport, checking if re-login is necessary before propagating the exception."); + attemptClientReLogin(); + + throw e; } catch (IOException e) { log.warn("Failed to open SASL transport", e); throw new TTransportException(e); @@ -327,6 +340,53 @@ public static TTransport createClientTransport(HostAndPort address, int timeout, return transport; } + /** + * Some wonderful snippets of documentation from HBase on performing the re-login client-side (as well as server-side) in the following paragraph. We want to + * attempt a re-login to automatically refresh the client's Krb "credentials" (remember, a server might also be a client, master sending RPC to tserver), but + * we have to take care to avoid Kerberos' replay attack protection. + *

+ * If multiple clients with the same principal try to connect to the same server at the same time, the server assumes a replay attack is in progress. This is + * a feature of kerberos. In order to work around this, what is done is that the client backs off randomly and tries to initiate the connection again. The + * other problem is to do with ticket expiry. To handle that, a relogin is attempted. + */ + static void attemptClientReLogin() { + try { + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + if (null == loginUser || !loginUser.hasKerberosCredentials()) { + // We should have already checked that we're logged in and have credentials. A precondition-like check. + throw new RuntimeException("Expected to find Kerberos UGI credentials, but did not"); + } + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + // A Proxy user is the "effective user" (in name only), riding on top of the "real user"'s Krb credentials. + UserGroupInformation realUser = currentUser.getRealUser(); + + // re-login only in case it is the login user or superuser. + if (loginUser.equals(currentUser) || loginUser.equals(realUser)) { + if (UserGroupInformation.isLoginKeytabBased()) { + log.info("Performing keytab-based Kerberos re-login"); + loginUser.reloginFromKeytab(); + } else { + log.info("Performing ticket-cache-based Kerberos re-login"); + loginUser.reloginFromTicketCache(); + } + + // Avoid the replay attack protection, sleep 1 to 5000ms + try { + Thread.sleep((SASL_BACKOFF_RAND.nextInt(RELOGIN_MAX_BACKOFF) + 1)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } else { + log.debug("Not attempting Kerberos re-login: loginUser={}, currentUser={}, realUser={}", loginUser, currentUser, realUser); + } + } catch (IOException e) { + // The inability to check is worrisome and deserves a RuntimeException instead of a propagated IO-like Exception. + log.warn("Failed to check (and/or perform) Kerberos client re-login", e); + throw new RuntimeException(e); + } + } + /** * Lifted from TSSLTransportFactory in Thrift-0.9.1. The method to create a client socket with an SSLContextFactory object is not visibile to us. Have to use * SslConnectionParams instead of TSSLTransportParameters because no getters exist on TSSLTransportParameters. diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java index e27a7e7ad96..ec859af6d75 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityUtil.java @@ -21,6 +21,8 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.fate.util.LoggingRunnable; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +32,7 @@ */ public class SecurityUtil { private static final Logger log = LoggerFactory.getLogger(SecurityUtil.class); + private static final Logger renewalLog = LoggerFactory.getLogger("KerberosTicketRenewal"); public static boolean usingKerberos = false; /** @@ -49,11 +52,10 @@ public static void serverLogin(AccumuloConfiguration acuConf) { if (login(principalConfig, keyTab)) { try { - // This spawns a thread to periodically renew the logged in (accumulo) user - UserGroupInformation.getLoginUser(); + startTicketRenewalThread(UserGroupInformation.getCurrentUser(), acuConf.getTimeInMillis(Property.GENERAL_KERBEROS_RENEWAL_PERIOD)); return; - } catch (IOException io) { - log.error("Error starting up renewal thread. This shouldn't be happenining.", io); + } catch (IOException e) { + log.error("Failed to obtain Kerberos user after successfully logging in", e); } } @@ -93,4 +95,41 @@ public static String getServerPrincipal(String configuredPrincipal) { throw new RuntimeException("Could not convert configured server principal: " + configuredPrincipal, e); } } + + /** + * Start a thread that periodically attempts to renew the current Kerberos user's ticket. + * + * @param ugi + * The current Kerberos user. + * @param renewalPeriod + * The amount of time between attempting renewals. + */ + static void startTicketRenewalThread(final UserGroupInformation ugi, final long renewalPeriod) { + Thread t = new Daemon(new LoggingRunnable(renewalLog, new Runnable() { + @Override + public void run() { + while (true) { + try { + renewalLog.debug("Invoking renewal attempt for Kerberos ticket"); + // While we run this "frequently", the Hadoop implementation will only perform the login at 80% of ticket lifetime. + ugi.checkTGTAndReloginFromKeytab(); + } catch (IOException e) { + // Should failures to renew the ticket be retried more quickly? + renewalLog.error("Failed to renew Kerberos ticket", e); + } + + // Wait for a bit before checking again. + try { + Thread.sleep(renewalPeriod); + } catch (InterruptedException e) { + renewalLog.error("Renewal thread interrupted", e); + Thread.currentThread().interrupt(); + return; + } + } + } + })); + t.setName("Kerberos Ticket Renewal"); + t.start(); + } } diff --git a/test/src/test/java/org/apache/accumulo/harness/TestingKdc.java b/test/src/test/java/org/apache/accumulo/harness/TestingKdc.java index 94712745b6e..06cd082b9e4 100644 --- a/test/src/test/java/org/apache/accumulo/harness/TestingKdc.java +++ b/test/src/test/java/org/apache/accumulo/harness/TestingKdc.java @@ -39,6 +39,7 @@ public class TestingKdc { private static final Logger log = LoggerFactory.getLogger(TestingKdc.class); public static final int NUM_USERS = 10; + public static final long MAX_TICKET_LIFETIME_MILLIS = 86400000; // one day protected MiniKdc kdc = null; protected ClusterUser accumuloServerUser = null, accumuloAdmin = null; @@ -51,10 +52,10 @@ public class TestingKdc { private boolean started = false; public TestingKdc() throws Exception { - this(computeKdcDir(), computeKeytabDir()); + this(computeKdcDir(), computeKeytabDir(), MAX_TICKET_LIFETIME_MILLIS); } - private static File computeKdcDir() { + public static File computeKdcDir() { File targetDir = new File(System.getProperty("user.dir"), "target"); Assert.assertTrue("Could not find Maven target directory: " + targetDir, targetDir.exists() && targetDir.isDirectory()); @@ -66,7 +67,7 @@ private static File computeKdcDir() { return kdcDir; } - private static File computeKeytabDir() { + public static File computeKeytabDir() { File targetDir = new File(System.getProperty("user.dir"), "target"); Assert.assertTrue("Could not find Maven target directory: " + targetDir, targetDir.exists() && targetDir.isDirectory()); @@ -79,8 +80,13 @@ private static File computeKeytabDir() { } public TestingKdc(File kdcDir, File keytabDir) throws Exception { + this(kdcDir, keytabDir, MAX_TICKET_LIFETIME_MILLIS); + } + + public TestingKdc(File kdcDir, File keytabDir, long maxTicketLifetime) throws Exception { checkNotNull(kdcDir, "KDC directory was null"); checkNotNull(keytabDir, "Keytab directory was null"); + checkArgument(maxTicketLifetime > 0, "Ticket lifetime must be positive"); this.keytabDir = keytabDir; this.hostname = InetAddress.getLocalHost().getCanonicalHostName(); @@ -90,6 +96,7 @@ public TestingKdc(File kdcDir, File keytabDir) throws Exception { Properties kdcConf = MiniKdc.createConf(); kdcConf.setProperty(MiniKdc.ORG_NAME, ORG_NAME); kdcConf.setProperty(MiniKdc.ORG_DOMAIN, ORG_DOMAIN); + kdcConf.setProperty(MiniKdc.MAX_TICKET_LIFETIME, Long.toString(maxTicketLifetime)); // kdcConf.setProperty(MiniKdc.DEBUG, "true"); kdc = new MiniKdc(kdcConf, kdcDir); } diff --git a/test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java b/test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java new file mode 100644 index 00000000000..19908f63701 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloIT; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.MiniClusterHarness; +import org.apache.accumulo.harness.TestingKdc; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; + +/** + * MAC test which uses {@link MiniKdc} to simulate ta secure environment. Can be used as a sanity check for Kerberos/SASL testing. + */ +public class KerberosRenewalIT extends AccumuloIT { + private static final Logger log = LoggerFactory.getLogger(KerberosRenewalIT.class); + + private static TestingKdc kdc; + private static String krbEnabledForITs = null; + private static ClusterUser rootUser; + + private static final long TICKET_LIFETIME = 6 * 60 * 1000; // Anything less seems to fail when generating the ticket + private static final long TICKET_TEST_LIFETIME = 8 * 60 * 1000; // Run a test for 8 mins + private static final long TEST_DURATION = 9 * 60 * 1000; // The test should finish within 9 mins + + @BeforeClass + public static void startKdc() throws Exception { + // 30s renewal time window + kdc = new TestingKdc(TestingKdc.computeKdcDir(), TestingKdc.computeKeytabDir(), TICKET_LIFETIME); + kdc.start(); + krbEnabledForITs = System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION); + if (null == krbEnabledForITs || !Boolean.parseBoolean(krbEnabledForITs)) { + System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, "true"); + } + rootUser = kdc.getRootUser(); + } + + @AfterClass + public static void stopKdc() throws Exception { + if (null != kdc) { + kdc.stop(); + } + if (null != krbEnabledForITs) { + System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, krbEnabledForITs); + } + } + + @Override + public int defaultTimeoutSeconds() { + return (int) TEST_DURATION / 1000; + } + + private MiniAccumuloClusterImpl mac; + + @Before + public void startMac() throws Exception { + MiniClusterHarness harness = new MiniClusterHarness(); + mac = harness.create(this, new PasswordToken("unused"), kdc, new MiniClusterConfigurationCallback() { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + Map site = cfg.getSiteConfig(); + site.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s"); + // Reduce the period just to make sure we trigger renewal fast + site.put(Property.GENERAL_KERBEROS_RENEWAL_PERIOD.getKey(), "5s"); + cfg.setSiteConfig(site); + } + + }); + + mac.getConfig().setNumTservers(1); + mac.start(); + // Enabled kerberos auth + Configuration conf = new Configuration(false); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + } + + @After + public void stopMac() throws Exception { + if (null != mac) { + mac.stop(); + } + } + + // Intentially setting the Test annotation timeout. We do not want to scale the timeout. + @Test(timeout = TEST_DURATION) + public void testReadAndWriteThroughTicketLifetime() throws Exception { + // Attempt to use Accumulo for a duration of time that exceeds the Kerberos ticket lifetime. + // This is a functional test to verify that Accumulo services renew their ticket. + // If the test doesn't finish on its own, this signifies that Accumulo services failed + // and the test should fail. If Accumulo services renew their ticket, the test case + // should exit gracefully on its own. + + // Login as the "root" user + UserGroupInformation.loginUserFromKeytab(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); + log.info("Logged in as {}", rootUser.getPrincipal()); + + Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + log.info("Created connector as {}", rootUser.getPrincipal()); + assertEquals(rootUser.getPrincipal(), conn.whoami()); + + long duration = 0; + long last = System.currentTimeMillis(); + // Make sure we have a couple renewals happen + while (duration < TICKET_TEST_LIFETIME) { + // Create a table, write a record, compact, read the record, drop the table. + createReadWriteDrop(conn); + // Wait a bit after + Thread.sleep(5000); + + // Update the duration + long now = System.currentTimeMillis(); + duration += now - last; + last = now; + } + } + + /** + * Creates a table, adds a record to it, and then compacts the table. A simple way to make sure that the system user exists (since the master does an RPC to + * the tserver which will create the system user if it doesn't already exist). + */ + private void createReadWriteDrop(Connector conn) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, TableExistsException { + final String table = testName.getMethodName() + "_table"; + conn.tableOperations().create(table); + BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); + Mutation m = new Mutation("a"); + m.put("b", "c", "d"); + bw.addMutation(m); + bw.close(); + conn.tableOperations().compact(table, new CompactionConfig().setFlush(true).setWait(true)); + Scanner s = conn.createScanner(table, Authorizations.EMPTY); + Entry entry = Iterables.getOnlyElement(s); + assertEquals("Did not find the expected key", 0, new Key("a", "b", "c").compareTo(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL)); + assertEquals("d", entry.getValue().toString()); + conn.tableOperations().delete(table); + } +}