diff --git a/.gitignore b/.gitignore index afa05f62c27..7748935381f 100644 --- a/.gitignore +++ b/.gitignore @@ -74,6 +74,13 @@ ligradle/checkstyle/suppressions.xml gobblin-core/src/test/resources/serde/output-staging/ gobblin-integration-test-log-dir/ gobblin-modules/gobblin-elasticsearch/test-elasticsearch/ +gobblin-modules/*/bin/ +gobblin-metrics-libs/*/bin/ +gobblin-config-management/*/bin/ +gobblin-cluster/GobblinHelixJobLauncherTest/ +gobblin-restli/gobblin-restli-utils/bin/ +gobblin-test-harness/gobblin-test-harness/ + temp/ ligradle/* @@ -87,3 +94,4 @@ gobblin-integration-test-work-dir/ gobblin-test-utils/src/main/gen-avro/ gobblin-test-utils/src/main/gen-proto/ +gobblin_config/ diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java index 3c7875158d2..828906bfa3b 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java @@ -21,14 +21,22 @@ import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.ApplicationConstants; +import com.google.api.client.util.Lists; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -46,10 +54,60 @@ @Alpha @Slf4j public class GobblinClusterUtils { - public static final String JAVA_TMP_DIR_KEY = "java.io.tmpdir"; + static final String JAVA_TMP_DIR_KEY = "java.io.tmpdir"; + /** + * This template will be resolved by replacing "VALUE" as the value that gobblin recognized. + * For more details, check {@link GobblinClusterUtils#setSystemProperties(Config)} + */ + private static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_LIST_TEMPLATE + = GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + "systemPropertiesList.$VALUE"; + + /** + * This enum is used for specifying JVM options that Gobblin-Cluster will set whose value will need to be obtained + * in JVM runtime. + * e.g. YARN_CACHE will be used by Gobblin-on-YARN (an extension of Gobblin-Cluster) and resolved to an YARN-specific + * temporary location internal to the application. + * + * Note that we could specify a couple of keys associated with the value, meaning the value should only be resolved + * to associated keys but nothing else to avoid abusive usage. Users could also set resolved + * {@link #GOBBLIN_CLUSTER_SYSTEM_PROPERTY_LIST_TEMPLATE} to expand default associated-key list. + * + * e.g. setting `gobblin.cluster.systemPropertiesList.YARN_CACHE` = [a,b] expands the associated-key list to + * [java.io.tmpdir, a, b]. Only when a key is found in the associated-key list, then when you set + * {@link GobblinClusterConfigurationKeys#GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX}.${keyName}=YARN_CACHE, will the + * resolution for the -D${KeyName} = resolvedValue(YARN_CACHE) happen. + */ + public enum JVM_ARG_VALUE_RESOLVER { + YARN_CACHE { + @Override + public List getAssociatedKeys() { + return yarnCacheAssociatedKeys; + } + + @Override + public String getResolution() { + //When keys like java.io.tmpdir is configured to "YARN_CACHE", it sets the tmp dir to the Yarn container's cache location. + // This setting will only be useful when the cluster is deployed in Yarn mode. + return System.getenv(ApplicationConstants.Environment.PWD.key()); + } + }; + + // Kept for backward-compatibility + private static List yarnCacheAssociatedKeys = ImmutableList.of(JAVA_TMP_DIR_KEY); - public enum TMP_DIR { - YARN_CACHE + // default associated key with the value. + public abstract List getAssociatedKeys() ; + + public abstract String getResolution(); + + public static boolean contains(String value) { + for (JVM_ARG_VALUE_RESOLVER v : JVM_ARG_VALUE_RESOLVER.values()) { + if (v.name().equalsIgnoreCase(value)) { + return true; + } + } + return false; + } } /** @@ -122,23 +180,39 @@ public static Path getJobStateFilePath(boolean usingStateStore, Path appWorkPath * @param config */ public static void setSystemProperties(Config config) { - Properties properties = ConfigUtils.configToProperties(ConfigUtils.getConfig(config, GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX, - ConfigFactory.empty())); + Properties properties = ConfigUtils.configToProperties(ConfigUtils.getConfig(config, + GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX, ConfigFactory.empty())); for (Map.Entry entry: properties.entrySet()) { - if (entry.getKey().toString().equals(JAVA_TMP_DIR_KEY)) { - if (entry.getValue().toString().equalsIgnoreCase(TMP_DIR.YARN_CACHE.toString())) { - //When java.io.tmpdir is configured to "YARN_CACHE", it sets the tmp dir to the Yarn container's cache location. - // This setting will only be useful when the cluster is deployed in Yarn mode. - log.info("Setting tmp directory to: {}", System.getenv(ApplicationConstants.Environment.PWD.key())); - System.setProperty(entry.getKey().toString(), System.getenv(ApplicationConstants.Environment.PWD.key())); + if (JVM_ARG_VALUE_RESOLVER.contains(entry.getValue().toString())) { + JVM_ARG_VALUE_RESOLVER enumMember = JVM_ARG_VALUE_RESOLVER.valueOf(entry.getValue().toString()); + List allowedKeys = new ArrayList<>(enumMember.getAssociatedKeys()); + allowedKeys.addAll(getAdditionalKeys(entry.getValue().toString(), config)); + + if (allowedKeys.contains(entry.getKey().toString())) { + log.info("Setting tmp directory to: {}", enumMember.getResolution()); + System.setProperty(entry.getKey().toString(), enumMember.getResolution()); continue; + } else { + log.warn("String {} not being registered for dynamic JVM-arg resolution, " + + "considering add it by setting extension key", entry.getKey()); } } System.setProperty(entry.getKey().toString(), entry.getValue().toString()); } } + private static Collection getAdditionalKeys(String value, Config config) { + String resolvedKey = GOBBLIN_CLUSTER_SYSTEM_PROPERTY_LIST_TEMPLATE.replace("$VALUE", value); + if (config.hasPath(resolvedKey)) { + return StreamSupport.stream( + Splitter.on(",").trimResults().omitEmptyStrings().split(config.getString(resolvedKey)).spliterator(), false + ).collect(Collectors.toList()); + } else { + return Lists.newArrayList(); + } + } + /** * Get the dynamic config from a {@link DynamicConfigGenerator} * @param config input config diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java index 4f58b2ebbcc..5ff1a3a0845 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java @@ -31,6 +31,7 @@ import org.apache.gobblin.util.PathUtils; +import static org.apache.gobblin.cluster.GobblinClusterUtils.JAVA_TMP_DIR_KEY; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -77,6 +78,25 @@ public void testSetSystemProperties() { Assert.assertEquals(System.getProperty("prop1"), "val1"); Assert.assertEquals(System.getProperty("prop2"), "val2"); Assert.assertEquals(System.getProperty("prop3"), "val3"); + + // Test specifically for key resolution using YARN_CACHE as the example. + config = config.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX + "." + + JAVA_TMP_DIR_KEY, ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name())) + .withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX + ".randomKey1", + ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name())) + .withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX + ".randomKey2", + ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name())) + .withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX + ".rejectedKey", + ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name())) + .withValue("gobblin.cluster.systemPropertiesList.YARN_CACHE", ConfigValueFactory.fromAnyRef("randomKey1,randomKey2")); + GobblinClusterUtils.setSystemProperties(config); + Assert.assertEquals(System.getProperty(JAVA_TMP_DIR_KEY), GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.getResolution()); + Assert.assertEquals(System.getProperty("randomKey1"), GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.getResolution()); + Assert.assertEquals(System.getProperty("randomKey2"), GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.getResolution()); + // For keys not being added in the list of `gobblin.cluster.systemPropertiesList.YARN_CACHE`, the value wont' + // be resolved. + Assert.assertEquals(System.getProperty("rejectedKey"), GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()); + } }