Skip to content

Commit

Permalink
TEZ-4397 Open Tez Input splits asynchronously
Browse files Browse the repository at this point in the history
Contributed by Ramesh Kumar Thangarajan
  • Loading branch information
ramesh0201 committed Mar 24, 2022
1 parent 20873a3 commit f724c54
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@
package org.apache.hadoop.mapred.split;

import java.io.IOException;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
Expand Down Expand Up @@ -129,14 +137,58 @@ public class TezGroupedSplitsRecordReader implements RecordReader<K, V> {
int idx = 0;
long progress;
RecordReader<K, V> curReader;

final AtomicInteger initIndex;
final int numReaders;
final ExecutorService initReaderExecService;
final Queue<Future<RecordReader<K,V>>> initedReaders;

public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job,
Reporter reporter) throws IOException {
this.groupedSplit = split;
this.job = job;
this.reporter = reporter;
this.initIndex = new AtomicInteger(0);
int numThreads = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS,
TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT);
this.numReaders = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS,
TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT);
this.initReaderExecService = Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MAX_PRIORITY)
.setNameFormat("TEZ-Split-Init-Thread-%d")
.build());
this.initedReaders = new LinkedList<>();
preInitReaders();
initNextRecordReader();
}

private void preInitReaders() {
if (initReaderExecService == null) {
LOG.info("Init record reader threadpool is not initialized");
return;
}
for (int i = 0; i < numReaders; i++) {
initedReaders.offer(this.initReaderExecService.submit(() -> {
try {
int index = initIndex.getAndIncrement();
if (index >= groupedSplit.wrappedSplits.size()) {
return null;
}
InputSplit s = groupedSplit.wrappedSplits.get(index);
RecordReader<K, V> reader = wrappedInputFormat.getRecordReader(s, job, reporter);
LOG.debug("Init Thread processed reader number {} initialization", index);
return reader;
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
cancelsFutures();
throw new RuntimeException(e);
}
}));
}
}

@Override
public boolean next(K key, V value) throws IOException {
Expand Down Expand Up @@ -183,6 +235,8 @@ protected boolean initNextRecordReader() throws IOException {

// if all chunks have been processed, nothing more to do.
if (idx == groupedSplit.wrappedSplits.size()) {
LOG.info("Shutting down the init record reader threadpool");
initReaderExecService.shutdownNow();
return false;
}

Expand All @@ -193,15 +247,25 @@ protected boolean initNextRecordReader() throws IOException {

// get a record reader for the idx-th chunk
try {
curReader = wrappedInputFormat.getRecordReader(
groupedSplit.wrappedSplits.get(idx), job, reporter);
curReader = initedReaders.poll().get();
preInitReaders();
} catch (Exception e) {
throw new RuntimeException (e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
cancelsFutures();
throw new RuntimeException(e);
}
idx++;
return true;
}

private void cancelsFutures() {
for (Future<RecordReader<K,V>> f : initedReaders) {
f.cancel(true);
}
}

@Override
public long getPos() throws IOException {
long subprogress = 0; // bytes processed in current split
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ public abstract class TezSplitGrouper {
public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only";
public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false;

/**
* Number of threads used to initialize the grouped splits, to asynchronously open the readers.
*/
public static final String TEZ_GROUPING_SPLIT_INIT_THREADS = "tez.grouping.split.init-threads";
public static final int TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT = 4;

/**
* Number of record readers to asynchronously and proactively init.
*/
public static final String TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS = "tez.grouping.split.init.num-recordreaders";
public static final int TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT = 10;

static class LocationHolder {
List<SplitContainer> splits;
Expand Down

0 comments on commit f724c54

Please sign in to comment.