Skip to content

Commit

Permalink
MAPREDUCE-7435. following chain through to validation
Browse files Browse the repository at this point in the history
* validation also uses manifest entries (and so works!)
* testing expects this
* tests of IOStats
* tests of new RemoteIterators

Change-Id: I4cfb308d4b08f1f775cfdbe2df6f8ff07ac6bc54
  • Loading branch information
steveloughran committed Apr 17, 2023
1 parent c96a769 commit 1358391
Show file tree
Hide file tree
Showing 24 changed files with 626 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
* {@link IOStatisticsSnapshot} to also support it.
* These are the simple setters, they don't provide for increments,
* decrements, calculation of min/max/mean etc.
* @since The interface and IOStatisticsSnapshot support came after Hadoop 3.3.5
* @since The interface and IOStatisticsSnapshot support was added after Hadoop 3.3.5
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface IOStatisticsSetters {
public interface IOStatisticsSetters extends IOStatistics {

/**
* Set a counter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
@InterfaceStability.Evolving
public final class IOStatisticsSnapshot
implements IOStatistics, Serializable, IOStatisticsAggregator,
IOStatisticsSetters{
IOStatisticsSetters {

private static final long serialVersionUID = -1762522703841538084L;

Expand Down Expand Up @@ -247,7 +247,7 @@ public synchronized void setMinimum(final String key, final long value) {

@Override
public void setMeanStatistic(final String key, final MeanStatistic value) {

meanStatistics().put(key, value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.statistics.impl;
package org.apache.hadoop.fs.statistics.impl;

import javax.annotation.Nullable;
import java.time.Duration;
Expand All @@ -25,7 +25,6 @@

import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.MeanStatistic;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;

/**
* This may seem odd having an IOStatisticsStore which does nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,11 +739,17 @@ public void close() throws IOException {
private static final class HaltableRemoteIterator<S>
extends WrappingRemoteIterator<S, S> {

/**
* Probe as to whether work should continue.
*/
private final CallableRaisingIOE<Boolean> continueWork;


/**
* Wrap an iterator with one which adds a continuation probe.
* The probe will be called in the {@link #hasNext()} method, before
* the source iterator is itself checked and in {@link #next()}
* before retrieval.
* That is: it may be called multiple times per iteration.
* @param source source iterator.
* @param continueWork predicate which will trigger a fast halt if it returns false.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.statistics;

import java.util.Arrays;
import java.util.Collection;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.hadoop.fs.statistics.impl.ForwardingIOStatisticsStore;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.test.AbstractHadoopTestBase;

import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticGauge;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMean;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMinimum;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;

/**
* Test the {@link IOStatisticsSetters} interface implementations through
* a parameterized run with each implementation.
* For each of the setters, the value is set, verified,
* updated, verified again.
* An option known to be undefined in all created IOStatisticsStore instances
* is set, to verify it is harmless.
*/

@RunWith(Parameterized.class)

public class TestIOStatisticsSetters extends AbstractHadoopTestBase {

public static final String COUNTER = "counter";

public static final String GAUGE = "gauge";

public static final String MAXIMUM = "max";

public static final String MINIMUM = "min";

public static final String MEAN = "mean";

private final IOStatisticsSetters ioStatistics;

@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{new IOStatisticsSnapshot()},
{createTestStore()},
{new ForwardingIOStatisticsStore(createTestStore())},
});
}

/**
* Create a test store with the stats used for testing set up.
* @return a set up store
*/
private static IOStatisticsStore createTestStore() {
return iostatisticsStore()
.withCounters(COUNTER)
.withGauges(GAUGE)
.withMaximums(MAXIMUM)
.withMinimums(MINIMUM)
.withMeanStatistics(MEAN)
.build();
}

public TestIOStatisticsSetters(IOStatisticsSetters ioStatisticsSetters) {
this.ioStatistics = ioStatisticsSetters;
}

@Test
public void testCounter() throws Throwable {
// write
ioStatistics.setCounter(COUNTER, 1);
assertThatStatisticCounter(ioStatistics, COUNTER)
.isEqualTo(1);

// update
ioStatistics.setCounter(COUNTER, 2);
assertThatStatisticCounter(ioStatistics, COUNTER)
.isEqualTo(2);

// unknown value
ioStatistics.setCounter("c2", 3);
}

@Test
public void testMaximum() throws Throwable {
// write
ioStatistics.setMaximum(MAXIMUM, 1);
assertThatStatisticMaximum(ioStatistics, MAXIMUM)
.isEqualTo(1);

// update
ioStatistics.setMaximum(MAXIMUM, 2);
assertThatStatisticMaximum(ioStatistics, MAXIMUM)
.isEqualTo(2);

// unknown value
ioStatistics.setMaximum("mm2", 3);
}

@Test
public void testMinimum() throws Throwable {
// write
ioStatistics.setMinimum(MINIMUM, 1);
assertThatStatisticMinimum(ioStatistics, MINIMUM)
.isEqualTo(1);

// update
ioStatistics.setMinimum(MINIMUM, 2);
assertThatStatisticMinimum(ioStatistics, MINIMUM)
.isEqualTo(2);

// unknown value
ioStatistics.setMinimum("c2", 3);
}

@Test
public void testGauge() throws Throwable {
// write
ioStatistics.setGauge(GAUGE, 1);
assertThatStatisticGauge(ioStatistics, GAUGE)
.isEqualTo(1);

// update
ioStatistics.setGauge(GAUGE, 2);
assertThatStatisticGauge(ioStatistics, GAUGE)
.isEqualTo(2);

// unknown value
ioStatistics.setGauge("g2", 3);
}

@Test
public void testMean() throws Throwable {
// write
final MeanStatistic mean11 = new MeanStatistic(1, 1);
ioStatistics.setMeanStatistic(MEAN, mean11);
assertThatStatisticMean(ioStatistics, MEAN)
.isEqualTo(mean11);

// update
final MeanStatistic mean22 = new MeanStatistic(2, 2);
ioStatistics.setMeanStatistic(MEAN, mean22);
assertThatStatisticMean(ioStatistics, MEAN)
.isEqualTo(mean22);

// unknown value
ioStatistics.setMeanStatistic("m2", mean11);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.util.Preconditions;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,6 +40,7 @@
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.RemoteIterators.*;
import static org.apache.hadoop.util.functional.RemoteIterators.haltableRemoteIterator;
import static org.assertj.core.api.Assertions.assertThat;

/**
Expand Down Expand Up @@ -287,6 +291,39 @@ public void testJavaIterableCloseInNextLoop() throws Throwable {

}

@Test
public void testHaltableIterator() throws Throwable {
final int limit = 4;
AtomicInteger count = new AtomicInteger(limit);

// a countdown of 10, but the halting predicate will fail earlier
// if the value of "count" has dropped to zero
final RemoteIterator<Integer> it =
haltableRemoteIterator(
new CountdownRemoteIterator(10),
() -> count.get() > 0);

Assertions.assertThat(foreach(it,
(v) -> count.decrementAndGet()))
.describedAs("Count of iterations")
.isEqualTo(limit);
}
@Test
public void testHaltableIteratorNoHalt() throws Throwable {

// a countdown of 10, but the halting predicate will fail earlier
// if the value of "count" has dropped to zero
final RemoteIterator<Integer> it =
haltableRemoteIterator(
new CountdownRemoteIterator(10),
() -> true);

Assertions.assertThat(foreach(it,
(v) -> {}))
.describedAs("Count of iterations")
.isEqualTo(10);
}

/**
* assert that the string value of an object contains the
* expected text.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public final class ManifestCommitterConfig implements IOStatisticsSource {
*/
private final boolean deleteTargetPaths;

/**
* Entry writer queue capacity.
*/
private final int writerQueueCapacity;

/**
* Constructor.
* @param outputPath destination path of the job.
Expand Down Expand Up @@ -190,6 +195,9 @@ public final class ManifestCommitterConfig implements IOStatisticsSource {
this.deleteTargetPaths = conf.getBoolean(
OPT_DELETE_TARGET_FILES,
OPT_DELETE_TARGET_FILES_DEFAULT);
this.writerQueueCapacity = conf.getInt(
OPT_WRITER_QUEUE_CAPACITY,
DEFAULT_WRITER_QUEUE_CAPACITY);

// if constructed with a task attempt, build the task ID and path.
if (context instanceof TaskAttemptContext) {
Expand Down Expand Up @@ -252,6 +260,7 @@ StageConfig createStageConfig() {
StageConfig stageConfig = new StageConfig();
stageConfig
.withConfiguration(conf)
.withDeleteTargetPaths(deleteTargetPaths)
.withIOStatistics(iostatistics)
.withJobAttemptNumber(jobAttemptNumber)
.withJobDirectories(dirs)
Expand All @@ -263,8 +272,7 @@ StageConfig createStageConfig() {
.withTaskAttemptDir(taskAttemptDir)
.withTaskAttemptId(taskAttemptId)
.withTaskId(taskId)
.withDeleteTargetPaths(deleteTargetPaths);

.withWriterQueueCapacity(writerQueueCapacity);
return stageConfig;
}

Expand Down Expand Up @@ -324,6 +332,14 @@ public String getName() {
return name;
}

/**
* Get writer queue capacity.
* @return the queue capacity
*/
public int getWriterQueueCapacity() {
return writerQueueCapacity;
}

@Override
public IOStatisticsStore getIOStatistics() {
return iostatistics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public final class ManifestCommitterConstants {
/**
* Default value: {@value}.
*/
public static final int OPT_IO_PROCESSORS_DEFAULT = 64;
public static final int OPT_IO_PROCESSORS_DEFAULT = 32;

/**
* Directory for saving job summary reports.
Expand Down Expand Up @@ -240,6 +240,26 @@ public final class ManifestCommitterConstants {
public static final String CAPABILITY_DYNAMIC_PARTITIONING =
"mapreduce.job.committer.dynamic.partitioning";


/**
* Queue capacity between task manifest loading an entry file writer.
* If more than this number of manifest lists are waiting to be written,
* the enqueue is blocking.
* There's an expectation that writing to the local file is a lot faster
* than the parallelized buffer reads, therefore that this queue can
* be emptied at the same rate it is filled.
* Value {@value}.
*/
public static final String OPT_WRITER_QUEUE_CAPACITY =
OPT_PREFIX + "writer.queue.capacity";


/**
* Default value of {@link #OPT_WRITER_QUEUE_CAPACITY}.
* Value {@value}.
*/
public static final int DEFAULT_WRITER_QUEUE_CAPACITY = 32;

private ManifestCommitterConstants() {
}

Expand Down

0 comments on commit 1358391

Please sign in to comment.