From f724c546069885e29e6446813805bb63bf0d5d9d Mon Sep 17 00:00:00 2001 From: Ramesh Kumar Date: Wed, 23 Mar 2022 21:34:47 -0700 Subject: [PATCH] TEZ-4397 Open Tez Input splits asynchronously Contributed by Ramesh Kumar Thangarajan --- .../split/TezGroupedSplitsInputFormat.java | 72 +++++++++++++++++-- .../mapreduce/grouper/TezSplitGrouper.java | 11 +++ 2 files changed, 79 insertions(+), 4 deletions(-) diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java index 61ba560300..6266ec1bcf 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java @@ -19,8 +19,16 @@ package org.apache.hadoop.mapred.split; import java.io.IOException; +import java.util.LinkedList; import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.tez.mapreduce.grouper.TezSplitGrouper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -129,14 +137,58 @@ public class TezGroupedSplitsRecordReader implements RecordReader { int idx = 0; long progress; RecordReader curReader; - + final AtomicInteger initIndex; + final int numReaders; + final ExecutorService initReaderExecService; + final Queue>> initedReaders; + public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job, Reporter reporter) throws IOException { this.groupedSplit = split; this.job = job; this.reporter = reporter; + this.initIndex = new AtomicInteger(0); + int numThreads = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS, + TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT); + this.numReaders = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS, + TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT); + this.initReaderExecService = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.MAX_PRIORITY) + .setNameFormat("TEZ-Split-Init-Thread-%d") + .build()); + this.initedReaders = new LinkedList<>(); + preInitReaders(); initNextRecordReader(); } + + private void preInitReaders() { + if (initReaderExecService == null) { + LOG.info("Init record reader threadpool is not initialized"); + return; + } + for (int i = 0; i < numReaders; i++) { + initedReaders.offer(this.initReaderExecService.submit(() -> { + try { + int index = initIndex.getAndIncrement(); + if (index >= groupedSplit.wrappedSplits.size()) { + return null; + } + InputSplit s = groupedSplit.wrappedSplits.get(index); + RecordReader reader = wrappedInputFormat.getRecordReader(s, job, reporter); + LOG.debug("Init Thread processed reader number {} initialization", index); + return reader; + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + cancelsFutures(); + throw new RuntimeException(e); + } + })); + } + } @Override public boolean next(K key, V value) throws IOException { @@ -183,6 +235,8 @@ protected boolean initNextRecordReader() throws IOException { // if all chunks have been processed, nothing more to do. if (idx == groupedSplit.wrappedSplits.size()) { + LOG.info("Shutting down the init record reader threadpool"); + initReaderExecService.shutdownNow(); return false; } @@ -193,15 +247,25 @@ protected boolean initNextRecordReader() throws IOException { // get a record reader for the idx-th chunk try { - curReader = wrappedInputFormat.getRecordReader( - groupedSplit.wrappedSplits.get(idx), job, reporter); + curReader = initedReaders.poll().get(); + preInitReaders(); } catch (Exception e) { - throw new RuntimeException (e); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + cancelsFutures(); + throw new RuntimeException(e); } idx++; return true; } + private void cancelsFutures() { + for (Future> f : initedReaders) { + f.cancel(true); + } + } + @Override public long getPos() throws IOException { long subprogress = 0; // bytes processed in current split diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java index a1d6b6c806..3b2f17d1ff 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java @@ -102,6 +102,17 @@ public abstract class TezSplitGrouper { public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only"; public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false; + /** + * Number of threads used to initialize the grouped splits, to asynchronously open the readers. + */ + public static final String TEZ_GROUPING_SPLIT_INIT_THREADS = "tez.grouping.split.init-threads"; + public static final int TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT = 4; + + /** + * Number of record readers to asynchronously and proactively init. + */ + public static final String TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS = "tez.grouping.split.init.num-recordreaders"; + public static final int TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT = 10; static class LocationHolder { List splits;