Skip to content

Commit

Permalink
[#983] improvement(tez): Optimize tez client delivery configuration (#…
Browse files Browse the repository at this point in the history
…985)

### What changes were proposed in this pull request?

Three improvement about configuration will be done in this issue.

- 1 For now, tez client use rss_conf.xml to delivery configuration. As #966 is applied, we can delivery configuration by edge conf.
- 2 delivery dynamic configuration from coordinator, then override the tez client configuration.
- 3 delivery configuration from client side.

### Why are the changes needed?

- 1. rss_conf.xml is unnecessary.
- 2. dynamic configuration from coordinator are not applied.
- 3. config in client side can not delivery to input/ouput.

### How was this patch tested?

integration test, unit test, test in yarn cluster, test in tez local mode.
  • Loading branch information
zhengchenyu committed Jul 4, 2023
1 parent ff2128f commit 3bd2315
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 222 deletions.
10 changes: 8 additions & 2 deletions client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.tez.common;

import java.util.Map;
import java.util.Set;

import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;

import org.apache.uniffle.client.util.RssClientConfig;
Expand Down Expand Up @@ -153,8 +155,6 @@ public class RssTezConfig {
public static final int RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE =
RssClientConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE;

public static final String RSS_CONF_FILE = "rss_conf.xml";

public static final String RSS_REMOTE_STORAGE_PATH =
TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH;

Expand All @@ -169,6 +169,12 @@ public class RssTezConfig {

public static final String RSS_REDUCE_INITIAL_MEMORY = TEZ_RSS_CONFIG_PREFIX + "rss.reduce.initial.memory";

public static final String RSS_ACCESS_TIMEOUT_MS = TEZ_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ACCESS_TIMEOUT_MS;
public static final int RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE = RssClientConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE;

public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
ImmutableSet.of(RSS_STORAGE_TYPE, RSS_REMOTE_STORAGE_PATH);

public static RssConf toRssConf(Configuration jobConf) {
RssConf rssConf = new RssConf();
for (Map.Entry<String, String> entry : jobConf) {
Expand Down
36 changes: 36 additions & 0 deletions client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -393,4 +393,40 @@ public static String replaceRssInputClassName(String className) {
return className;
}
}

public static void applyDynamicClientConf(Configuration conf, Map<String, String> confItems) {
if (conf == null) {
LOG.warn("Tez conf is null");
return;
}

if (confItems == null || confItems.isEmpty()) {
LOG.warn("Empty conf items");
return;
}

for (Map.Entry<String, String> kv : confItems.entrySet()) {
String tezConfKey = kv.getKey();
if (!tezConfKey.startsWith(RssTezConfig.TEZ_RSS_CONFIG_PREFIX)) {
tezConfKey = RssTezConfig.TEZ_RSS_CONFIG_PREFIX + tezConfKey;
}
String tezConfVal = kv.getValue();
if (StringUtils.isEmpty(conf.get(tezConfKey, ""))
|| RssTezConfig.RSS_MANDATORY_CLUSTER_CONF.contains(tezConfKey)) {
LOG.warn("Use conf dynamic conf {} = {}", tezConfKey, tezConfVal);
conf.set(tezConfKey, tezConfVal);
}
}
}

public static Configuration filterRssConf(Configuration extraConf) {
Configuration conf = new Configuration(false);
for (Map.Entry<String, String> entry : extraConf) {
String key = entry.getKey();
if (key.startsWith(RssTezConfig.TEZ_RSS_CONFIG_PREFIX)) {
conf.set(entry.getKey(), entry.getValue());
}
}
return conf;
}
}
25 changes: 7 additions & 18 deletions client-tez/src/main/java/org/apache/tez/common/UmbilicalUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
Expand All @@ -38,24 +35,15 @@

import org.apache.uniffle.common.ShuffleServerInfo;

import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;

public class UmbilicalUtils {
private static final Logger LOG = LoggerFactory.getLogger(UmbilicalUtils.class);

private UmbilicalUtils() {
}

/**
*
* @return Get Application Master host and port from config file
*/
private static Pair<String, Integer> getAmHostPort() {
JobConf conf = new JobConf(RssTezConfig.RSS_CONF_FILE);
String host = conf.get(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS, "null host");
int port = conf.getInt(RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT, -1);
LOG.info("Got RssConf am info, host is: {}, port is: {}", host, port);
return new ImmutablePair<>(host, port);
}

/**
*
* @param applicationId Application Id of this task
Expand All @@ -74,8 +62,9 @@ private static Map<Integer, List<ShuffleServerInfo>> doRequestShuffleServer(
int shuffleId) throws IOException, InterruptedException, TezException {
UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(applicationId.toString());

Pair<String, Integer> amHostPort = getAmHostPort();
final InetSocketAddress address = NetUtils.createSocketAddrForHost(amHostPort.getLeft(), amHostPort.getRight());
String host = conf.get(RSS_AM_SHUFFLE_MANAGER_ADDRESS);
int port = conf.getInt(RSS_AM_SHUFFLE_MANAGER_PORT, -1);
final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);
TezRemoteShuffleUmbilicalProtocol umbilical = taskOwner
.doAs(new PrivilegedExceptionAction<TezRemoteShuffleUmbilicalProtocol>() {
@Override
Expand All @@ -92,7 +81,7 @@ public TezRemoteShuffleUmbilicalProtocol run() throws Exception {
.getShuffleAssignmentsInfo()
.getPartitionToServers();
LOG.info("RequestShuffleServer applicationId:{}, taskAttemptId:{}, host:{}, port:{}, shuffleId:{}, worker:{}",
applicationId, taskAttemptId, amHostPort.getLeft(), amHostPort.getRight(), shuffleId, partitionToServers);
applicationId, taskAttemptId, host, port, shuffleId, partitionToServers);
return partitionToServers;
}

Expand Down
Loading

0 comments on commit 3bd2315

Please sign in to comment.