Skip to content

Commit

Permalink
[GOBBLIN-1372] Generalization of GobblinClusterUtils#setSystemProperties
Browse files Browse the repository at this point in the history
Closes #3213 from autumnust/GOBBLIN-1372
  • Loading branch information
autumnust committed Jan 26, 2021
1 parent b021e9c commit 26c22ef
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 11 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Expand Up @@ -74,6 +74,13 @@ ligradle/checkstyle/suppressions.xml
gobblin-core/src/test/resources/serde/output-staging/
gobblin-integration-test-log-dir/
gobblin-modules/gobblin-elasticsearch/test-elasticsearch/
gobblin-modules/*/bin/
gobblin-metrics-libs/*/bin/
gobblin-config-management/*/bin/
gobblin-cluster/GobblinHelixJobLauncherTest/
gobblin-restli/gobblin-restli-utils/bin/
gobblin-test-harness/gobblin-test-harness/


temp/
ligradle/*
Expand All @@ -87,3 +94,4 @@ gobblin-integration-test-work-dir/
gobblin-test-utils/src/main/gen-avro/
gobblin-test-utils/src/main/gen-proto/

gobblin_config/
Expand Up @@ -21,14 +21,22 @@
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;

import com.google.api.client.util.Lists;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

Expand All @@ -46,10 +54,60 @@
@Alpha
@Slf4j
public class GobblinClusterUtils {
public static final String JAVA_TMP_DIR_KEY = "java.io.tmpdir";
static final String JAVA_TMP_DIR_KEY = "java.io.tmpdir";
/**
* This template will be resolved by replacing "VALUE" as the value that gobblin recognized.
* For more details, check {@link GobblinClusterUtils#setSystemProperties(Config)}
*/
private static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_LIST_TEMPLATE
= GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + "systemPropertiesList.$VALUE";

/**
* This enum is used for specifying JVM options that Gobblin-Cluster will set whose value will need to be obtained
* in JVM runtime.
* e.g. YARN_CACHE will be used by Gobblin-on-YARN (an extension of Gobblin-Cluster) and resolved to an YARN-specific
* temporary location internal to the application.
*
* Note that we could specify a couple of keys associated with the value, meaning the value should only be resolved
* to associated keys but nothing else to avoid abusive usage. Users could also set resolved
* {@link #GOBBLIN_CLUSTER_SYSTEM_PROPERTY_LIST_TEMPLATE} to expand default associated-key list.
*
* e.g. setting `gobblin.cluster.systemPropertiesList.YARN_CACHE` = [a,b] expands the associated-key list to
* [java.io.tmpdir, a, b]. Only when a key is found in the associated-key list, then when you set
* {@link GobblinClusterConfigurationKeys#GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX}.${keyName}=YARN_CACHE, will the
* resolution for the -D${KeyName} = resolvedValue(YARN_CACHE) happen.
*/
public enum JVM_ARG_VALUE_RESOLVER {
YARN_CACHE {
@Override
public List<String> getAssociatedKeys() {
return yarnCacheAssociatedKeys;
}

@Override
public String getResolution() {
//When keys like java.io.tmpdir is configured to "YARN_CACHE", it sets the tmp dir to the Yarn container's cache location.
// This setting will only be useful when the cluster is deployed in Yarn mode.
return System.getenv(ApplicationConstants.Environment.PWD.key());
}
};

// Kept for backward-compatibility
private static List<String> yarnCacheAssociatedKeys = ImmutableList.of(JAVA_TMP_DIR_KEY);

public enum TMP_DIR {
YARN_CACHE
// default associated key with the value.
public abstract List<String> getAssociatedKeys() ;

public abstract String getResolution();

public static boolean contains(String value) {
for (JVM_ARG_VALUE_RESOLVER v : JVM_ARG_VALUE_RESOLVER.values()) {
if (v.name().equalsIgnoreCase(value)) {
return true;
}
}
return false;
}
}

/**
Expand Down Expand Up @@ -122,23 +180,39 @@ public static Path getJobStateFilePath(boolean usingStateStore, Path appWorkPath
* @param config
*/
public static void setSystemProperties(Config config) {
Properties properties = ConfigUtils.configToProperties(ConfigUtils.getConfig(config, GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX,
ConfigFactory.empty()));
Properties properties = ConfigUtils.configToProperties(ConfigUtils.getConfig(config,
GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX, ConfigFactory.empty()));

for (Map.Entry<Object, Object> entry: properties.entrySet()) {
if (entry.getKey().toString().equals(JAVA_TMP_DIR_KEY)) {
if (entry.getValue().toString().equalsIgnoreCase(TMP_DIR.YARN_CACHE.toString())) {
//When java.io.tmpdir is configured to "YARN_CACHE", it sets the tmp dir to the Yarn container's cache location.
// This setting will only be useful when the cluster is deployed in Yarn mode.
log.info("Setting tmp directory to: {}", System.getenv(ApplicationConstants.Environment.PWD.key()));
System.setProperty(entry.getKey().toString(), System.getenv(ApplicationConstants.Environment.PWD.key()));
if (JVM_ARG_VALUE_RESOLVER.contains(entry.getValue().toString())) {
JVM_ARG_VALUE_RESOLVER enumMember = JVM_ARG_VALUE_RESOLVER.valueOf(entry.getValue().toString());
List<String> allowedKeys = new ArrayList<>(enumMember.getAssociatedKeys());
allowedKeys.addAll(getAdditionalKeys(entry.getValue().toString(), config));

if (allowedKeys.contains(entry.getKey().toString())) {
log.info("Setting tmp directory to: {}", enumMember.getResolution());
System.setProperty(entry.getKey().toString(), enumMember.getResolution());
continue;
} else {
log.warn("String {} not being registered for dynamic JVM-arg resolution, "
+ "considering add it by setting extension key", entry.getKey());
}
}
System.setProperty(entry.getKey().toString(), entry.getValue().toString());
}
}

private static Collection<String> getAdditionalKeys(String value, Config config) {
String resolvedKey = GOBBLIN_CLUSTER_SYSTEM_PROPERTY_LIST_TEMPLATE.replace("$VALUE", value);
if (config.hasPath(resolvedKey)) {
return StreamSupport.stream(
Splitter.on(",").trimResults().omitEmptyStrings().split(config.getString(resolvedKey)).spliterator(), false
).collect(Collectors.toList());
} else {
return Lists.newArrayList();
}
}

/**
* Get the dynamic config from a {@link DynamicConfigGenerator}
* @param config input config
Expand Down
Expand Up @@ -31,6 +31,7 @@

import org.apache.gobblin.util.PathUtils;

import static org.apache.gobblin.cluster.GobblinClusterUtils.JAVA_TMP_DIR_KEY;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -77,6 +78,25 @@ public void testSetSystemProperties() {
Assert.assertEquals(System.getProperty("prop1"), "val1");
Assert.assertEquals(System.getProperty("prop2"), "val2");
Assert.assertEquals(System.getProperty("prop3"), "val3");

// Test specifically for key resolution using YARN_CACHE as the example.
config = config.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX + "." +
JAVA_TMP_DIR_KEY, ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()))
.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX + ".randomKey1",
ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()))
.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX + ".randomKey2",
ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()))
.withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX + ".rejectedKey",
ConfigValueFactory.fromAnyRef(GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name()))
.withValue("gobblin.cluster.systemPropertiesList.YARN_CACHE", ConfigValueFactory.fromAnyRef("randomKey1,randomKey2"));
GobblinClusterUtils.setSystemProperties(config);
Assert.assertEquals(System.getProperty(JAVA_TMP_DIR_KEY), GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.getResolution());
Assert.assertEquals(System.getProperty("randomKey1"), GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.getResolution());
Assert.assertEquals(System.getProperty("randomKey2"), GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.getResolution());
// For keys not being added in the list of `gobblin.cluster.systemPropertiesList.YARN_CACHE`, the value wont'
// be resolved.
Assert.assertEquals(System.getProperty("rejectedKey"), GobblinClusterUtils.JVM_ARG_VALUE_RESOLVER.YARN_CACHE.name());

}

}

0 comments on commit 26c22ef

Please sign in to comment.