Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11226-V2
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Nov 14, 2022
2 parents b62ec21 + a48e8c9 commit 61f9165
Show file tree
Hide file tree
Showing 28 changed files with 2,058 additions and 774 deletions.
4 changes: 2 additions & 2 deletions LICENSE-binary
Expand Up @@ -241,8 +241,8 @@ com.google.guava:guava:27.0-jre
com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
com.microsoft.azure:azure-storage:7.0.0
com.nimbusds:nimbus-jose-jwt:9.8.1
com.squareup.okhttp3:okhttp:4.9.3
com.squareup.okio:okio:1.6.0
com.squareup.okhttp3:okhttp:4.10.0
com.squareup.okio:okio:3.2.0
com.zaxxer:HikariCP:4.0.3
commons-beanutils:commons-beanutils:1.9.3
commons-cli:commons-cli:1.2
Expand Down
3 changes: 3 additions & 0 deletions hadoop-client-modules/hadoop-client-runtime/pom.xml
Expand Up @@ -148,6 +148,7 @@
<!-- Leave javax APIs that are stable -->
<!-- the jdk ships part of the javax.annotation namespace, so if we want to relocate this we'll have to care it out by class :( -->
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>io.netty:*</exclude>
<exclude>io.dropwizard.metrics:metrics-core</exclude>
<exclude>org.eclipse.jetty:jetty-servlet</exclude>
<exclude>org.eclipse.jetty:jetty-security</exclude>
Expand All @@ -156,6 +157,8 @@
<exclude>org.bouncycastle:*</exclude>
<!-- Leave snappy that includes native methods which cannot be relocated. -->
<exclude>org.xerial.snappy:*</exclude>
<!-- leave out kotlin classes -->
<exclude>org.jetbrains.kotlin:*</exclude>
</excludes>
</artifactSet>
<filters>
Expand Down
5 changes: 5 additions & 0 deletions hadoop-common-project/hadoop-common/pom.xml
Expand Up @@ -383,6 +383,11 @@
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-jvm</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>dnsjava</groupId>
<artifactId>dnsjava</artifactId>
Expand Down
Expand Up @@ -208,7 +208,8 @@ The following table lists the configuration property names that are deprecated i
| mapred.task.profile.params | mapreduce.task.profile.params |
| mapred.task.profile.reduces | mapreduce.task.profile.reduces |
| mapred.task.timeout | mapreduce.task.timeout |
| mapred.tasktracker.indexcache.mb | mapreduce.tasktracker.indexcache.mb |
| mapred.tasktracker.indexcache.mb | mapreduce.reduce.shuffle.indexcache.mb |
| mapreduce.tasktracker.indexcache.mb | mapreduce.reduce.shuffle.indexcache.mb |
| mapred.tasktracker.map.tasks.maximum | mapreduce.tasktracker.map.tasks.maximum |
| mapred.tasktracker.memory\_calculator\_plugin | mapreduce.tasktracker.resourcecalculatorplugin |
| mapred.tasktracker.memorycalculatorplugin | mapreduce.tasktracker.resourcecalculatorplugin |
Expand Down
10 changes: 10 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
Expand Up @@ -37,6 +37,16 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<exclusions>
<exclusion>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-jvm</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-jvm</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
Expand Down
Expand Up @@ -520,7 +520,7 @@ public void addVolume(final StorageLocation location,

for (final NamespaceInfo nsInfo : nsInfos) {
String bpid = nsInfo.getBlockPoolID();
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try {
fsVolume.addBlockPool(bpid, this.conf, this.timer);
fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
} catch (IOException e) {
Expand Down
Expand Up @@ -23,7 +23,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,7 +43,7 @@ class IndexCache {
public IndexCache(JobConf conf) {
this.conf = conf;
totalMemoryAllowed =
conf.getInt(TTConfig.TT_INDEX_CACHE, 10) * 1024 * 1024;
conf.getInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 10) * 1024 * 1024;
LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
}

Expand Down
Expand Up @@ -577,6 +577,8 @@ public interface MRJobConfig {
public static final String MAX_SHUFFLE_FETCH_HOST_FAILURES = "mapreduce.reduce.shuffle.max-host-failures";
public static final int DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES = 5;

public static final String SHUFFLE_INDEX_CACHE = "mapreduce.reduce.shuffle.indexcache.mb";

public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";

public static final String REDUCE_SKIP_MAXGROUPS = "mapreduce.reduce.skip.maxgroups";
Expand Down
Expand Up @@ -29,6 +29,12 @@
@InterfaceStability.Evolving
public interface TTConfig extends MRConfig {

/**
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.MRJobConfig#SHUFFLE_INDEX_CACHE}
* instead
*/
@Deprecated
public static final String TT_INDEX_CACHE =
"mapreduce.tasktracker.indexcache.mb";
public static final String TT_MAP_SLOTS =
Expand Down
Expand Up @@ -53,14 +53,15 @@

import org.apache.hadoop.classification.VisibleForTesting;

class Fetcher<K,V> extends Thread {
@VisibleForTesting
public class Fetcher<K, V> extends Thread {

private static final Logger LOG = LoggerFactory.getLogger(Fetcher.class);

/** Number of ms before timing out a copy */
/** Number of ms before timing out a copy. */
private static final int DEFAULT_STALLED_COPY_TIMEOUT = 3 * 60 * 1000;

/** Basic/unit connection timeout (in milliseconds) */
/** Basic/unit connection timeout (in milliseconds). */
private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;

/* Default read timeout (in milliseconds) */
Expand All @@ -72,19 +73,21 @@ class Fetcher<K,V> extends Thread {
private static final String FETCH_RETRY_AFTER_HEADER = "Retry-After";

protected final Reporter reporter;
private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
@VisibleForTesting
public enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE}

private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";

@VisibleForTesting
public final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
private final JobConf jobConf;
private final Counters.Counter connectionErrs;
private final Counters.Counter ioErrs;
private final Counters.Counter wrongLengthErrs;
private final Counters.Counter badIdErrs;
private final Counters.Counter wrongMapErrs;
private final Counters.Counter wrongReduceErrs;
protected final MergeManager<K,V> merger;
protected final ShuffleSchedulerImpl<K,V> scheduler;
protected final MergeManager<K, V> merger;
protected final ShuffleSchedulerImpl<K, V> scheduler;
protected final ShuffleClientMetrics metrics;
protected final ExceptionReporter exceptionReporter;
protected final int id;
Expand All @@ -111,7 +114,7 @@ private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
private static SSLFactory sslFactory;

public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleSchedulerImpl<K,V> scheduler, MergeManager<K,V> merger,
ShuffleSchedulerImpl<K, V> scheduler, MergeManager<K, V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, SecretKey shuffleKey) {
this(job, reduceId, scheduler, merger, reporter, metrics,
Expand All @@ -120,7 +123,7 @@ public Fetcher(JobConf job, TaskAttemptID reduceId,

@VisibleForTesting
Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleSchedulerImpl<K,V> scheduler, MergeManager<K,V> merger,
ShuffleSchedulerImpl<K, V> scheduler, MergeManager<K, V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, SecretKey shuffleKey,
int id) {
Expand Down Expand Up @@ -315,9 +318,8 @@ protected void copyFromHost(MapHost host) throws IOException {
return;
}

if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+ maps);
if (LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: " + maps);
}

// List of maps to be fetched yet
Expand Down Expand Up @@ -411,8 +413,8 @@ private void openConnectionWithRetry(URL url) throws IOException {
shouldWait = false;
} catch (IOException e) {
if (!fetchRetryEnabled) {
// throw exception directly if fetch's retry is not enabled
throw e;
// throw exception directly if fetch's retry is not enabled
throw e;
}
if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) {
LOG.warn("Failed to connect to host: " + url + "after "
Expand Down Expand Up @@ -489,7 +491,7 @@ private TaskAttemptID[] copyMapOutput(MapHost host,
DataInputStream input,
Set<TaskAttemptID> remaining,
boolean canRetry) throws IOException {
MapOutput<K,V> mapOutput = null;
MapOutput<K, V> mapOutput = null;
TaskAttemptID mapId = null;
long decompressedLength = -1;
long compressedLength = -1;
Expand Down Expand Up @@ -611,7 +613,7 @@ private void checkTimeoutOrRetry(MapHost host, IOException ioe)
// First time to retry.
long currentTime = Time.monotonicNow();
if (retryStartTime == 0) {
retryStartTime = currentTime;
retryStartTime = currentTime;
}

// Retry is not timeout, let's do retry with throwing an exception.
Expand All @@ -628,7 +630,7 @@ private void checkTimeoutOrRetry(MapHost host, IOException ioe)
}

/**
* Do some basic verification on the input received -- Being defensive
* Do some basic verification on the input received -- Being defensive.
* @param compressedLength
* @param decompressedLength
* @param forReduce
Expand Down Expand Up @@ -695,8 +697,7 @@ private URL getMapOutputURL(MapHost host, Collection<TaskAttemptID> maps
* only on the last failure. Instead of connecting with a timeout of
* X, we try connecting with a timeout of x < X but multiple times.
*/
private void connect(URLConnection connection, int connectionTimeout)
throws IOException {
private void connect(URLConnection connection, int connectionTimeout) throws IOException {
int unit = 0;
if (connectionTimeout < 0) {
throw new IOException("Invalid timeout "
Expand Down
Expand Up @@ -879,4 +879,9 @@ public int compareTo(Path obj) {
return super.compareTo(obj);
}
}

@VisibleForTesting
OnDiskMerger getOnDiskMerger() {
return onDiskMerger;
}
}
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -109,4 +110,14 @@ public void run() {
}

public abstract void merge(List<T> inputs) throws IOException;

@VisibleForTesting
int getMergeFactor() {
return mergeFactor;
}

@VisibleForTesting
LinkedList<List<T>> getPendingToBeMerged() {
return pendingToBeMerged;
}
}
Expand Up @@ -80,8 +80,6 @@ private static void addDeprecatedKeys() {
JTConfig.JT_TASKCACHE_LEVELS),
new DeprecationDelta("mapred.job.tracker.retire.jobs",
JTConfig.JT_RETIREJOBS),
new DeprecationDelta("mapred.tasktracker.indexcache.mb",
TTConfig.TT_INDEX_CACHE),
new DeprecationDelta("mapred.tasktracker.map.tasks.maximum",
TTConfig.TT_MAP_SLOTS),
new DeprecationDelta("mapred.tasktracker.memory_calculator_plugin",
Expand Down Expand Up @@ -290,6 +288,10 @@ private static void addDeprecatedKeys() {
MRJobConfig.REDUCE_LOG_LEVEL),
new DeprecationDelta("mapreduce.job.counters.limit",
MRJobConfig.COUNTERS_MAX_KEY),
new DeprecationDelta("mapred.tasktracker.indexcache.mb",
MRJobConfig.SHUFFLE_INDEX_CACHE),
new DeprecationDelta("mapreduce.tasktracker.indexcache.mb",
MRJobConfig.SHUFFLE_INDEX_CACHE),
new DeprecationDelta("jobclient.completion.poll.interval",
Job.COMPLETION_POLL_INTERVAL_KEY),
new DeprecationDelta("jobclient.progress.monitor.poll.interval",
Expand Down
Expand Up @@ -30,7 +30,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -56,7 +56,7 @@ public void testLRCPolicy() throws Exception {
r.setSeed(seed);
System.out.println("seed: " + seed);
fs.delete(p, true);
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
conf.setInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 1);
final int partsPerMap = 1000;
final int bytesPerFile = partsPerMap * 24;
IndexCache cache = new IndexCache(conf);
Expand Down Expand Up @@ -127,7 +127,7 @@ public void testLRCPolicy() throws Exception {
public void testBadIndex() throws Exception {
final int parts = 30;
fs.delete(p, true);
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
conf.setInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 1);
IndexCache cache = new IndexCache(conf);

Path f = new Path(p, "badindex");
Expand Down Expand Up @@ -159,7 +159,7 @@ public void testBadIndex() throws Exception {
@Test
public void testInvalidReduceNumberOrLength() throws Exception {
fs.delete(p, true);
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
conf.setInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 1);
final int partsPerMap = 1000;
final int bytesPerFile = partsPerMap * 24;
IndexCache cache = new IndexCache(conf);
Expand Down Expand Up @@ -205,7 +205,7 @@ public void testRemoveMap() throws Exception {
// fails with probability of 100% on code before MAPREDUCE-2541,
// so it is repeatable in practice.
fs.delete(p, true);
conf.setInt(TTConfig.TT_INDEX_CACHE, 10);
conf.setInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 10);
// Make a big file so removeMapThread almost surely runs faster than
// getInfoThread
final int partsPerMap = 100000;
Expand Down Expand Up @@ -251,7 +251,7 @@ public void run() {
@Test
public void testCreateRace() throws Exception {
fs.delete(p, true);
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
conf.setInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 1);
final int partsPerMap = 1000;
final int bytesPerFile = partsPerMap * 24;
final IndexCache cache = new IndexCache(conf);
Expand Down
Expand Up @@ -23,7 +23,6 @@
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -44,7 +43,6 @@
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
import org.apache.hadoop.test.Whitebox;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -217,8 +215,7 @@ public void testIoSortDefaults() {

@SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException,
InterruptedException {
public void testOnDiskMerger() throws IOException {
JobConf jobConf = new JobConf();
final int SORT_FACTOR = 5;
jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);
Expand All @@ -229,12 +226,8 @@ public void testOnDiskMerger() throws IOException, URISyntaxException,
new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
, null, null, null, null, null, null, null, null, null, mapOutputFile);

MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
IntWritable, IntWritable>) Whitebox.getInternalState(manager,
"onDiskMerger");
int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
"mergeFactor");
MergeThread onDiskMerger = manager.getOnDiskMerger();
int mergeFactor = onDiskMerger.getMergeFactor();

// make sure the io.sort.factor is set properly
assertEquals(mergeFactor, SORT_FACTOR);
Expand All @@ -252,9 +245,7 @@ public void testOnDiskMerger() throws IOException, URISyntaxException,
}

//Check that the files pending to be merged are in sorted order.
LinkedList<List<CompressAwarePath>> pendingToBeMerged =
(LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
onDiskMerger, "pendingToBeMerged");
LinkedList<List<CompressAwarePath>> pendingToBeMerged = onDiskMerger.getPendingToBeMerged();
assertTrue("No inputs were added to list pending to merge",
pendingToBeMerged.size() > 0);
for(int i = 0; i < pendingToBeMerged.size(); ++i) {
Expand Down

0 comments on commit 61f9165

Please sign in to comment.