Permalink
Browse files

[FLINK-5364] [security] Rework JAAS configuration to support user-sup…

…plied entries

Fixes FLINK-5364, FLINK-5361, FLINK-5350, FLINK-5055

This closes #3057
  • Loading branch information...
1 parent 03b62ae commit fc3a778c0cafe1adc9efbd8796a8bd64122e4ad2 @EronWright EronWright committed with StephanEwen Dec 20, 2016
Showing with 1,184 additions and 762 deletions.
  1. +96 −37 docs/internals/flink_security.md
  2. +38 −17 docs/setup/config.md
  3. +4 −4 ...r-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
  4. +0 −14 flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
  5. +0 −8 flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
  6. +62 −0 flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
  7. +12 −9 flink-dist/src/main/resources/flink-conf.yaml
  8. +21 −0 flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
  9. +0 −2 ...s/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
  10. +0 −2 ...k-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
  11. +7 −7 flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java
  12. +111 −0 flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
  13. +0 −160 flink-runtime/src/main/java/org/apache/flink/runtime/security/JaasConfiguration.java
  14. +125 −0 flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
  15. +135 −187 flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
  16. +119 −0 flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
  17. +146 −0 flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
  18. +59 −0 flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
  19. +76 −0 flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
  20. +3 −6 flink-runtime/src/main/resources/flink-jaas.conf
  21. +3 −2 ...k-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
  22. +13 −17 ...test/java/org/apache/flink/runtime/security/{JaasConfigurationTest.java → KerberosUtilsTest.java}
  23. +36 −69 flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
  24. +11 −37 ...utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
  25. +0 −106 ...ls-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
  26. +16 −22 ...tils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
  27. +5 −5 flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
  28. +3 −2 flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
  29. +16 −10 flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
  30. +16 −9 flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
  31. +19 −12 flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
  32. +16 −9 flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
  33. +16 −9 flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -24,64 +24,123 @@ specific language governing permissions and limitations
under the License.
-->
-This document briefly describes how Flink security works in the context of various deployment mechanism (Standalone/Cluster vs YARN)
-and the connectors that participates in Flink Job execution stage. This documentation can be helpful for both administrators and developers
-who plans to run Flink on a secure environment.
+This document briefly describes how Flink security works in the context of various deployment mechanisms (Standalone, YARN, or Mesos),
+filesystems, connectors, and state backends.
## Objective
+The primary goals of the Flink Kerberos security infrastructure are:
+1. to enable secure data access for jobs within a cluster via connectors (e.g. Kafka)
+2. to authenticate to ZooKeeper (if configured to use SASL)
+3. to authenticate to Hadoop components (e.g. HDFS, HBase)
-The primary goal of Flink security model is to enable secure data access for jobs within a cluster via connectors. In a production deployment scenario,
-streaming jobs are understood to run for longer period of time (days/weeks/months) and the system must be able to authenticate against secure
-data sources throughout the life of the job. The current implementation supports running Flink clusters (Job Manager/Task Manager/Jobs) under the
-context of a Kerberos identity based on Keytab credential supplied during deployment time. Any jobs submitted will continue to run in the identity of the cluster.
+In a production deployment scenario, streaming jobs are understood to run for long periods of time (days/weeks/months) and be able to authenticate to secure
+data sources throughout the life of the job. Kerberos keytabs do not expire in that timeframe, unlike a Hadoop delegation token
+or ticket cache entry.
+
+The current implementation supports running Flink clusters (Job Manager/Task Manager/jobs) with either a configured keytab credential
+or with Hadoop delegation tokens. Keep in mind that all jobs share the credential configured for a given cluster. To use a different keytab
+for for a certain job, simply launch a separate Flink cluster with a different configuration. Numerous Flink clusters may run side-by-side in a YARN
+or Mesos environment.
## How Flink Security works
-Flink deployment includes running Job Manager/ZooKeeper, Task Manager(s), Web UI and Job(s). Jobs (user code) can be submitted through web UI and/or CLI.
-A Job program may use one or more connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.,) and each connector may have a specific security
-requirements (Kerberos, database based, SSL/TLS, custom etc.,). While satisfying the security requirements for all the connectors evolves over a period
-of time, at this time of writing, the following connectors/services are tested for Kerberos/Keytab based security.
+In concept, a Flink program may use first- or third-party connectors (Kafka, HDFS, Cassandra, Flume, Kinesis etc.) necessitating arbitrary authentication methods (Kerberos, SSL/TLS, username/password, etc.). While satisfying the security requirements for all connectors is an ongoing effort,
+Flink provides first-class support for Kerberos authentication only. The following services and connectors are tested for Kerberos authentication:
-- Kafka (0.9)
+- Kafka (0.9+)
- HDFS
+- HBase
- ZooKeeper
-Hadoop uses the UserGroupInformation (UGI) class to manage security. UGI is a static implementation that takes care of handling Kerberos authentication. The Flink bootstrap implementation
-(JM/TM/CLI) takes care of instantiating UGI with the appropriate security credentials to establish the necessary security context.
+Note that it is possible to enable the use of Kerberos independently for each service or connector. For example, the user may enable
+Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa. The shared element is the configuration of
+Kerbreros credentials, which is then explicitly used by each component.
+
+The internal architecture is based on security modules (implementing `org.apache.flink.runtime.security.modules.SecurityModule`) which
+are installed at startup. The next section describes each security module.
+
+### Hadoop Security Module
+This module uses the Hadoop `UserGroupInformation` (UGI) class to establish a process-wide *login user* context. The login user is
+then used for all interactions with Hadoop, including HDFS, HBase, and YARN.
+
+If Hadoop security is enabled (in `core-site.xml`), the login user will have whatever Kerberos credential is configured. Otherwise,
+the login user conveys only the user identity of the OS account that launched the cluster.
+
+### JAAS Security Module
+This module provides a dynamic JAAS configuration to the cluster, making available the configured Kerberos credential to ZooKeeper,
+Kafka, and other such components that rely on JAAS.
+
+Note that the user may also provide a static JAAS configuration file using the mechanisms described in the [Java SE Documentation](http://docs.oracle.com/javase/7/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html). Static entries override any
+dynamic entries provided by this module.
+
+### ZooKeeper Security Module
+This module configures certain process-wide ZooKeeper security-related settings, namely the ZooKeeper service name (default: `zookeeper`)
+and the JAAS login context name (default: `Client`).
+
+## Security Configuration
+
+### Flink Configuration
+The user's Kerberos ticket cache (managed with `kinit`) is used automatically, based on the following configuration option:
-Services like Kafka and ZooKeeper use SASL/JAAS based authentication mechanism to authenticate against a Kerberos server. It expects JAAS configuration with a platform-specific login
-module *name* to be provided. Managing per-connector configuration files will be an overhead and to overcome this requirement, a process-wide JAAS configuration object is
-instantiated which serves standard ApplicationConfigurationEntry for the connectors that authenticates using SASL/JAAS mechanism.
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from the user's Kerberos ticket cache (default: `true`).
-It is important to understand that the Flink processes (JM/TM/UI/Jobs) itself uses UGI's doAS() implementation to run under a specific user context, i.e. if Hadoop security is enabled
-then the Flink processes will be running under a secure user account or else it will run as the OS login user account who starts the Flink cluster.
+A Kerberos keytab can be supplied by adding below configuration elements to the Flink configuration file:
-## Security Configurations
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
-Secure credentials can be supplied by adding below configuration elements to Flink configuration file:
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
-- `security.keytab`: Absolute path to Kerberos keytab file that contains the user credentials/secret.
+These configuration options establish a cluster-wide credential to be used in a Hadoop and/or JAAS context. Whether the credential is used in a Hadoop context is based on the Hadoop configuration (see next section). To be used in a JAAS context, the configuration specifies which JAAS *login contexts* (or *applications*) are enabled with the following configuration option:
-- `security.principal`: User principal name that the Flink cluster should run as.
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client` to use the credentials for ZooKeeper authentication).
-The delegation token mechanism (*kinit cache*) is still supported for backward compatibility but enabling security using *keytab* configuration is the preferred and recommended approach.
+ZooKeeper-related configuration overrides:
-## Standalone Mode:
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`). Facilitates mutual-authentication between the client (Flink) and server.
+
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
+one of the values specified in `security.kerberos.login.contexts`.
+
+### Hadoop Configuration
+
+The Hadoop configuration is located via the `HADOOP_CONF_DIR` environment variable and by other means (see `org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils`). The Kerberos credential (configured above) is used automatically if Hadoop security is enabled.
+
+Note that Kerberos credentials found in the ticket cache aren't transferrable to other hosts. In this scenario, the Flink CLI acquires Hadoop
+delegation tokens (for HDFS and for HBase).
+
+## Deployment Modes
+Here is some information specific to each deployment mode.
+
+### Standalone Mode
Steps to run a secure Flink cluster in standalone/cluster mode:
-- Add security configurations to Flink configuration file (on all cluster nodes)
-- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration on all cluster nodes
-- Deploy Flink cluster using cluster start/stop scripts or CLI
+1. Add security-related configuration options to the Flink configuration file (on all cluster nodes).
+2. Ensure that the keytab file exists at the path indicated by `security.kerberos.login.keytab` on all cluster nodes.
+3. Deploy Flink cluster as normal.
+
+### YARN/Mesos Mode
+
+Steps to run a secure Flink cluster in YARN/Mesos mode:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Ensure that the keytab file exists at the path as indicated by `security.kerberos.login.keytab` on the client node.
+3. Deploy Flink cluster as normal.
+
+In YARN/Mesos mode, the keytab is automatically copied from the client to the Flink containers.
-## Yarn Mode:
+For more information, see <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">YARN security</a> documentation.
-Steps to run secure Flink cluster in Yarn mode:
-- Add security configurations to Flink configuration file (on the node from where cluster will be provisioned using Flink/Yarn CLI)
-- Make sure the Keytab file exist in the path as indicated in *security.keytab* configuration
-- Deploy Flink cluster using CLI
+#### Using `kinit` (YARN only)
-In Yarn mode, the user supplied keytab will be copied over to the Yarn containers (App Master/JM and TM) as the Yarn local resource file.
-Security implementation details are based on <a href="https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md">Yarn security</a>
+In YARN mode, it is possible to deploy a secure Flink cluster without a keytab, using only the ticket cache (as managed by `kinit`).
+This avoids the complexity of generating a keytab and avoids entrusting the cluster manager with it. The main drawback is
+that the cluster is necessarily short-lived since the generated delegation tokens will expire (typically within a week).
-## Token Renewal
+Steps to run a secure Flink cluster using `kinit`:
+1. Add security-related configuration options to the Flink configuration file on the client.
+2. Login using the `kinit` command.
+3. Deploy Flink cluster as normal.
-UGI and Kafka/ZK login module implementations takes care of auto-renewing the tickets upon reaching expiry and no further action is needed on the part of Flink.
+## Further Details
+### Ticket Renewal
+Each component that uses Kerberos is independently responsible for renewing the Kerberos ticket-granting-ticket (TGT).
+Hadoop, ZooKeeper, and Kafka all renew the TGT automatically when provided a keytab. In the delegation token scenario,
+YARN itself renews the token (up to its maximum lifespan).
View
@@ -96,37 +96,58 @@ These options are useful for debugging a Flink application for memory and garbag
- `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if `taskmanager.debug.memory.startLogThread` is set to true.
-### Kerberos
+### Kerberos-based Security
-Flink supports Kerberos authentication for the following services
+Flink supports Kerberos authentication for the following services:
-+ Hadoop Components: such as HDFS, YARN, or HBase.
++ Hadoop Components (such as HDFS, YARN, or HBase)
+ Kafka Connectors (version 0.9+)
-+ Zookeeper Server/Client
++ Zookeeper
-Hadoop components relies on the UserGroupInformation (UGI) implementation to handle Kerberos authentication, whereas Kafka and Zookeeper services handles Kerberos authentication through SASL/JAAS implementation.
-
-**Kerberos is only properly supported in Hadoop version 2.6.1 and above. All
+**Kerberos is supported only in Hadoop version 2.6.1 and above. All
other versions have critical bugs which might fail the Flink job
unexpectedly.**
-**Ticket cache** and **Keytab** modes are supported for all above mentioned services.
+Configuring Flink for Kerberos security involves three aspects:
+
+1. Providing the cluster with a Kerberos credential (i.e. a keytab or a ticket via `kinit`)
+2. Making the Kerberos credential available to components and connectors as needed
+3. Configuring the component and/or connector to use Kerberos authentication
+
+To provide the cluster with a Kerberos credential, either configure the login keytab using the below configuration options,
+or login using `kinit` before starting the cluster.
+
+It is preferable to use keytabs for long-running jobs, to avoid ticket expiration issues. If you prefer to use the ticket cache,
+talk to your administrator about increasing the Hadoop delegation token lifetime.
+
+- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from your Kerberos ticket cache (default: `true`).
+
+- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file that contains the user credentials.
-> Ticket cache (Supported only to provide backward compatibility support. Keytab is the preferred approach for long running jobs)
+- `security.kerberos.login.principal`: Kerberos principal name associated with the keytab.
-While Hadoop uses Kerberos tickets to authenticate users with services initially, the authentication process continues differently afterwards. Instead of saving the ticket to authenticate on a later access, Hadoop creates its own security tokens (DelegationToken) that it passes around. These are authenticated to Kerberos periodically but are independent of the token renewal time. The tokens have a maximum life span identical to the Kerberos ticket maximum life span.
+If Hadoop security is enabled (in `core-site.xml`), Flink will automatically use the configured Kerberos credentials when connecting to HDFS, HBase, and other Hadoop components.
-While using ticket cache mode, please make sure to set the maximum ticket life span high long running jobs.
+Make the Kerberos credentials available to any connector or component that uses a JAAS configuration file by configuring JAAS login contexts.
-If you are on YARN, then it is sufficient to authenticate the client with Kerberos. On a Flink standalone cluster you need to ensure that, initially, all nodes are authenticated with Kerberos using the `kinit` tool.
+- `security.kerberos.login.contexts`: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication).
+
+You may also provide a static JAAS configuration file, whose entries override those produced by the above configuration option.
+
+Be sure to configure the connector within your Flink program as necessary to use Kerberos authentication. For the Kafka connector,
+use the following properties:
+
+```
+security.protocol=SASL_PLAINTEXT (or SASL_SSL)
+sasl.kerberos.service.name=kafka
+```
-> Keytab (security principal and keytab can be configured through Flink configuration file)
-- `security.keytab`: Path to Keytab file
-- `security.principal`: Principal associated with the keytab
+Flink provides some additional options to configure ZooKeeper security:
-Kerberos ticket renewal is abstracted and automatically handled by the Hadoop/Kafka/ZK login modules and ensures that tickets are renewed in time and you can be sure to be authenticated until the end of the ticket life time.
+- `zookeeper.sasl.service-name`: The Kerberos service name that the ZooKeeper cluster is configured to use (default: `zookeeper`).
-For Kafka and ZK, process-wide JAAS config will be created using the provided security credentials and the Kerberos authentication will be handled by Kafka/ZK login handlers.
+- `zookeeper.sasl.login-context-name`: The JAAS login context name that the ZooKeeper client uses to request the login context (default: `Client`). Should match
+one of the values specified in `security.kerberos.login.contexts`.
### Other
@@ -21,6 +21,7 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SecureTestEnvironment;
@@ -116,13 +117,12 @@ public static void startSecureCluster() throws Exception {
populateSecureConfigurations();
Configuration flinkConfig = new Configuration();
- flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
SecureTestEnvironment.getTestKeytab());
- flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+ flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
SecureTestEnvironment.getHadoopServicePrincipal());
- SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig);
- ctx.setHadoopConfiguration(conf);
+ SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig, conf);
try {
TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
} catch (Exception e) {
@@ -1395,20 +1395,6 @@
/** The environment variable name which contains the Flink installation root directory */
public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
- // -------------------------------- Security -------------------------------
-
- /**
- * The config parameter defining security credentials required
- * for securing Flink cluster.
- */
-
- /** Keytab file key name to be used in flink configuration file */
- public static final String SECURITY_KEYTAB_KEY = "security.keytab";
-
- /** Kerberos security principal key name to be used in flink configuration file */
- public static final String SECURITY_PRINCIPAL_KEY = "security.principal";
-
-
/**
* Not instantiable.
*/
@@ -124,14 +124,6 @@
.defaultValue(3)
.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
- public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE =
- key("zookeeper.sasl.disable")
- .defaultValue(true);
-
- public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME =
- key("zookeeper.sasl.service-name")
- .noDefaultValue();
-
// ------------------------------------------------------------------------
/** Not intended to be instantiated */
Oops, something went wrong.

0 comments on commit fc3a778

Please sign in to comment.