From cc8249e932c8c5b117c901188dbd92bf6f8d5391 Mon Sep 17 00:00:00 2001 From: csjuhasz-c <66361392+csjuhasz-c@users.noreply.github.com> Date: Mon, 17 Jan 2022 14:15:27 +0100 Subject: [PATCH] TEZ-4340: Show convenient input -> output vertex names in input messages (#170) (Csaba Juhasz reviewed by Laszlo Bodor) --- .../apache/tez/runtime/api/InputContext.java | 6 ++ .../apache/tez/mapreduce/input/MRInput.java | 16 ++-- .../tez/mapreduce/input/MRInputLegacy.java | 4 +- .../tez/mapreduce/input/MultiMRInput.java | 10 +-- .../runtime/api/impl/TezInputContextImpl.java | 5 ++ .../library/common/shuffle/Fetcher.java | 44 ++++----- .../shuffle/FetcherWithInjectableErrors.java | 16 ++-- .../impl/ShuffleInputEventHandlerImpl.java | 2 +- .../common/shuffle/impl/ShuffleManager.java | 62 ++++++------- .../orderedgrouped/FetcherOrderedGrouped.java | 16 ++-- ...herOrderedGroupedWithInjectableErrors.java | 18 ++-- .../shuffle/orderedgrouped/MergeManager.java | 17 ++-- .../shuffle/orderedgrouped/Shuffle.java | 35 ++++---- ...huffleInputEventHandlerOrderedGrouped.java | 2 +- .../orderedgrouped/ShuffleScheduler.java | 18 ++-- .../library/input/OrderedGroupedKVInput.java | 4 +- .../library/input/UnorderedKVInput.java | 4 +- .../library/common/shuffle/TestFetcher.java | 25 ++++-- .../TestShuffleInputEventHandlerImpl.java | 1 + .../shuffle/impl/TestShuffleManager.java | 1 + .../shuffle/orderedgrouped/TestFetcher.java | 90 ++++++++----------- .../shuffle/orderedgrouped/TestShuffle.java | 1 + .../java/org/apache/tez/test/TestInput.java | 3 +- 23 files changed, 209 insertions(+), 191 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java index 479a7dba1b..6eac2dfc95 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java @@ -32,6 +32,12 @@ public interface InputContext extends TaskContext { * @return Name of the Source Vertex */ public String getSourceVertexName(); + + /** + * Returns a convenient, human-readable string describing the input and output vertices. + * @return the convenient string + */ + String getInputOutputVertexNames(); /** * Get the index of the input in the set of all inputs for the task. The diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index 964c0b8301..e64d273b44 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -461,7 +461,7 @@ public List initialize() throws IOException { getContext().inputIsReady(); this.splitInfoViaEvents = jobConf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT); - LOG.info(getContext().getSourceVertexName() + " using newmapreduce API=" + useNewApi + + LOG.info(getContext().getInputOutputVertexNames() + " using newmapreduce API=" + useNewApi + ", split via event=" + splitInfoViaEvents + ", numPhysicalInputs=" + getNumPhysicalInputs()); initializeInternal(); @@ -526,7 +526,7 @@ inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(), } finally { rrLock.unlock(); } - LOG.info("Initialized MRInput: " + getContext().getSourceVertexName()); + LOG.info("Initialized MRInput: " + getContext().getInputOutputVertexNames()); } /** @@ -634,7 +634,7 @@ void processSplitEvent(InputDataInformationEvent event) try { initFromEventInternal(event); if (LOG.isDebugEnabled()) { - LOG.debug(getContext().getSourceVertexName() + " notifying on RecordReader initialized"); + LOG.debug(getContext().getInputOutputVertexNames() + " notifying on RecordReader initialized"); } rrInited.signal(); } finally { @@ -647,7 +647,7 @@ void checkAndAwaitRecordReaderInitialization() throws IOException { rrLock.lock(); try { if (LOG.isDebugEnabled()) { - LOG.debug(getContext().getSourceVertexName() + " awaiting RecordReader initialization"); + LOG.debug(getContext().getInputOutputVertexNames() + " awaiting RecordReader initialization"); } rrInited.await(); } catch (Exception e) { @@ -671,7 +671,7 @@ void initFromEvent(InputDataInformationEvent initEvent) private void initFromEventInternal(InputDataInformationEvent initEvent) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug(getContext().getSourceVertexName() + " initializing RecordReader from event"); + LOG.debug(getContext().getInputOutputVertexNames() + " initializing RecordReader from event"); } Objects.requireNonNull(initEvent, "InitEvent must be specified"); MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload())); @@ -686,7 +686,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I LOG.warn("Thread interrupted while getting split length: ", e); } if (LOG.isDebugEnabled()) { - LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + + LOG.debug(getContext().getInputOutputVertexNames() + " split Details -> SplitClass: " + split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength); } @@ -696,7 +696,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I splitObj = split; splitLength = split.getLength(); if (LOG.isDebugEnabled()) { - LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + + LOG.debug(getContext().getInputOutputVertexNames() + " split Details -> SplitClass: " + split.getClass().getName() + ", OldSplit: " + split + ", length: " + splitLength); } } @@ -705,7 +705,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I .increment(splitLength); } mrReader.setSplit(splitObj); - LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event"); + LOG.info(getContext().getInputOutputVertexNames() + " initialized RecordReader from event"); } private static class MRInputHelpersInternal extends MRInputHelpers { diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java index 70be7ee444..bbf145eb6c 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java @@ -74,7 +74,7 @@ public MRInputLegacy(InputContext inputContext, int numPhysicalInputs) { @Private protected void initializeInternal() throws IOException { - LOG.info(getContext().getSourceVertexName() + " MRInputLegacy deferring initialization"); + LOG.info(getContext().getInputOutputVertexNames() + " MRInputLegacy deferring initialization"); } @Private @@ -136,7 +136,7 @@ void checkAndAwaitRecordReaderInitialization() throws IOException { if (splitInfoViaEvents && !inited) { if (initEvent == null) { if (LOG.isDebugEnabled()) { - LOG.debug(getContext().getSourceVertexName() + + LOG.debug(getContext().getInputOutputVertexNames() + " awaiting init event before initializing record reader"); } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java index 3bc8629565..4a98052211 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java @@ -110,7 +110,7 @@ private MultiMRInputConfigBuilder(Configuration conf, Class inputFormat) { @Override public List initialize() throws IOException { super.initialize(); - LOG.info(getContext().getSourceVertexName() + " using newmapreduce API=" + useNewApi + + LOG.info(getContext().getInputOutputVertexNames() + " using newmapreduce API=" + useNewApi + ", numPhysicalInputs=" + getNumPhysicalInputs()); if (getNumPhysicalInputs() == 0) { getContext().inputIsReady(); @@ -167,7 +167,7 @@ public void handleEvents(List inputEvents) throws Exception { private MRReader initFromEvent(InputDataInformationEvent event) throws IOException { Objects.requireNonNull(event, "Event must be specified"); if (LOG.isDebugEnabled()) { - LOG.debug(getContext().getSourceVertexName() + " initializing Reader: " + eventCount.get()); + LOG.debug(getContext().getInputOutputVertexNames() + " initializing Reader: " + eventCount.get()); } MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload())); MRReader reader = null; @@ -186,7 +186,7 @@ private MRReader initFromEvent(InputDataInformationEvent event) throws IOExcepti .getApplicationId().getId(), getContext().getTaskIndex(), getContext() .getTaskAttemptNumber(), getContext()); if (LOG.isDebugEnabled()) { - LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + + LOG.debug(getContext().getInputOutputVertexNames() + " split Details -> SplitClass: " + split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength); } } else { @@ -196,7 +196,7 @@ private MRReader initFromEvent(InputDataInformationEvent event) throws IOExcepti reader = new MRReaderMapred(localJobConf, split, getContext().getCounters(), inputRecordCounter, getContext()); if (LOG.isDebugEnabled()) { - LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " + + LOG.debug(getContext().getInputOutputVertexNames() + " split Details -> SplitClass: " + split.getClass().getName() + ", OldSplit: " + split + ", length: " + splitLength); } } @@ -204,7 +204,7 @@ private MRReader initFromEvent(InputDataInformationEvent event) throws IOExcepti getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES) .increment(splitLength); } - LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event"); + LOG.info(getContext().getInputOutputVertexNames() + " initialized RecordReader from event"); return reader; } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index f28573a649..1c1c10bf86 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -152,6 +152,11 @@ public String getSourceVertexName() { return sourceVertexName; } + @Override + public String getInputOutputVertexNames() { + return String.format("%s -> %s", getSourceVertexName(), getTaskVertexName()); + } + @Override public void fatalError(Throwable exception, String message) { super.signalFatalError(exception, message, sourceInfo); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 6039df3a2f..eb34ec2993 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -58,7 +58,7 @@ import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.runtime.api.ObjectRegistry; +import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; @@ -69,6 +69,7 @@ import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type; import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError; import org.apache.tez.common.Preconditions; +import org.apache.tez.common.TezUtilsInternal; /** * Responsible for fetching inputs served by the ShuffleHandler for a single @@ -194,8 +195,8 @@ public String getHost() { private final boolean isDebugEnabled = LOG.isDebugEnabled(); protected Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params, - FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier, - JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf, + FetchedInputAllocator inputManager, InputContext inputContext, + JobTokenSecretManager jobTokenSecretManager, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, @@ -208,8 +209,8 @@ protected Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params, this.fetcherCallback = fetcherCallback; this.inputManager = inputManager; this.jobTokenSecretMgr = jobTokenSecretManager; - this.appId = appId; - this.dagIdentifier = dagIdentifier; + this.appId = inputContext.getApplicationId(); + this.dagIdentifier = inputContext.getDagIdentifier(); this.pathToAttemptMap = new HashMap(); this.httpConnectionParams = params; this.conf = conf; @@ -218,7 +219,10 @@ protected Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params, this.sharedFetchEnabled = sharedFetchEnabled; this.fetcherIdentifier = fetcherIdGen.getAndIncrement(); - this.logIdentifier = " fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier; + + String sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + " -> " + + TezUtilsInternal.cleanVertexName(inputContext.getTaskVertexName()); + this.logIdentifier = " fetcher [" + sourceDestNameTrimmed +"] " + fetcherIdentifier; this.localFs = localFs; this.localDirAllocator = localDirAllocator; @@ -1133,31 +1137,29 @@ public static class FetcherBuilder { private boolean workAssigned = false; public FetcherBuilder(FetcherCallback fetcherCallback, - HttpConnectionParams params, FetchedInputAllocator inputManager, - ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, - Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort, - boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) { - this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, - jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled, + HttpConnectionParams params, FetchedInputAllocator inputManager, InputContext inputContext, + JobTokenSecretManager jobTokenSecretMgr, Configuration conf, boolean localDiskFetchEnabled, + String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) { + this.fetcher = new Fetcher(fetcherCallback, params, inputManager, inputContext, + jobTokenSecretMgr, conf, null, null, null, localDiskFetchEnabled, false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch); } public FetcherBuilder(FetcherCallback fetcherCallback, - HttpConnectionParams params, FetchedInputAllocator inputManager, - ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, - Configuration conf, RawLocalFileSystem localFs, + HttpConnectionParams params, FetchedInputAllocator inputManager, InputContext inputContext, + JobTokenSecretManager jobTokenSecretMgr, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch, - boolean enableFetcherTestingErrors, ObjectRegistry objectRegistry) { + boolean enableFetcherTestingErrors) { if (enableFetcherTestingErrors) { - this.fetcher = new FetcherWithInjectableErrors(fetcherCallback, params, inputManager, appId, dagIdentifier, - jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, + this.fetcher = new FetcherWithInjectableErrors(fetcherCallback, params, inputManager, inputContext, + jobTokenSecretMgr, conf, localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp, - verifyDiskChecksum, compositeFetch, objectRegistry); + verifyDiskChecksum, compositeFetch); } else { - this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, - jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, + this.fetcher = new Fetcher(fetcherCallback, params, inputManager, inputContext, + jobTokenSecretMgr, conf, localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch); } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherWithInjectableErrors.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherWithInjectableErrors.java index 951adf9f5d..cf53a57a9e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherWithInjectableErrors.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetcherWithInjectableErrors.java @@ -20,10 +20,10 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.http.HttpConnectionParams; -import org.apache.tez.runtime.api.ObjectRegistry; +import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,16 +35,16 @@ public class FetcherWithInjectableErrors extends Fetcher { private String srcNameTrimmed; protected FetcherWithInjectableErrors(FetcherCallback fetcherCallback, HttpConnectionParams params, - FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier, - JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf, + FetchedInputAllocator inputManager, InputContext inputContext, + JobTokenSecretManager jobTokenSecretManager, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, - boolean compositeFetch, ObjectRegistry objectRegistry) { - super(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretManager, srcNameTrimmed, conf, + boolean compositeFetch) { + super(fetcherCallback, params, inputManager, inputContext, jobTokenSecretManager, conf, localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch); - this.fetcherErrorTestingConfig = new FetcherErrorTestingConfig(conf, objectRegistry); - this.srcNameTrimmed = srcNameTrimmed; + this.fetcherErrorTestingConfig = new FetcherErrorTestingConfig(conf, inputContext.getObjectRegistry()); + this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()); LOG.info("Initialized FetcherWithInjectableErrors with config: {}", fetcherErrorTestingConfig); } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index bcb7bb58ea..ca1259f30a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -161,7 +161,7 @@ private void handleEvent(Event event) throws IOException { @Override public void logProgress(boolean updateOnClose) { - LOG.info(inputContext.getSourceVertexName() + ": " + LOG.info(inputContext.getInputOutputVertexNames() + ": " + "numDmeEventsSeen=" + numDmeEvents.get() + ", numDmeEventsSeenWithNoData=" + numDmeEventsNoData.get() + ", numObsoletionEventsSeen=" + numObsoletionEvents.get() diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index fa539c8374..769ac68f7e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -185,7 +185,7 @@ public class ShuffleManager implements FetcherCallback { */ private final int maxTimeToWaitForReportMillis; - private final String srcNameTrimmed; + private final String sourceDestNameTrimmed; private final int maxTaskOutputAtOnce; @@ -264,8 +264,9 @@ public ShuffleManager(InputContext inputContext, Configuration conf, int numInpu conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS_DEFAULT); - this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()); - + this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + " -> " + + TezUtilsInternal.cleanVertexName(inputContext.getTaskVertexName()); + completedInputSet = new BitSet(numInputs); /** * In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt. @@ -288,15 +289,15 @@ public ShuffleManager(InputContext inputContext, Configuration conf, int numInpu if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) { fetcherRawExecutor = inputContext.createTezFrameworkExecutorService(numFetchers, - "Fetcher_B {" + srcNameTrimmed + "} #%d"); + "Fetcher_B {" + sourceDestNameTrimmed + "} #%d"); } else { fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d").build()); + .setDaemon(true).setNameFormat("Fetcher_B {" + sourceDestNameTrimmed + "} #%d").build()); } this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor); ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}").build()); + .setDaemon(true).setNameFormat("ShuffleRunner {" + sourceDestNameTrimmed + "}").build()); this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor); this.schedulerCallable = new RunShuffleCallable(conf); @@ -336,7 +337,7 @@ public ShuffleManager(InputContext inputContext, Configuration conf, int numInpu shuffleInfoEventsMap = new ConcurrentHashMap(); - LOG.info(srcNameTrimmed + ": numInputs=" + numInputs + ", compressionCodec=" + LOG.info(sourceDestNameTrimmed + ": numInputs=" + numInputs + ", compressionCodec=" + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers=" + numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled=" + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", " @@ -361,7 +362,7 @@ public void run() throws IOException { if (maxTimeToWaitForReportMillis > 0) { reporterExecutor = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}") + .setNameFormat("ShuffleRunner {" + sourceDestNameTrimmed + "}") .build()); Future reporterFuture = reporterExecutor.submit(new ReporterCallable()); } @@ -446,7 +447,7 @@ protected Void callInternal() throws Exception { break; } - LOG.debug("{}: NumCompletedInputs: {}", srcNameTrimmed, numCompletedInputs); + LOG.debug("{}: NumCompletedInputs: {}", sourceDestNameTrimmed, numCompletedInputs); if (numCompletedInputs.get() < numInputs && !isShutdown.get()) { lock.lock(); try { @@ -458,7 +459,8 @@ protected Void callInternal() throws Exception { inputHost = pendingHosts.take(); } catch (InterruptedException e) { if (isShutdown.get()) { - LOG.info(srcNameTrimmed + ": " + "Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop"); + LOG.info(sourceDestNameTrimmed + ": " + + "Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop"); Thread.currentThread().interrupt(); break; } else { @@ -466,14 +468,14 @@ protected Void callInternal() throws Exception { } } if (LOG.isDebugEnabled()) { - LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " + + LOG.debug(sourceDestNameTrimmed + ": " + "Processing pending host: " + inputHost.toDetailedString()); } if (inputHost.getNumPendingPartitions() > 0 && !isShutdown.get()) { Fetcher fetcher = constructFetcherForHost(inputHost, conf); runningFetchers.add(fetcher); if (isShutdown.get()) { - LOG.info(srcNameTrimmed + ": " + "hasBeenShutdown," + + LOG.info(sourceDestNameTrimmed + ": " + "hasBeenShutdown," + "Breaking out of ShuffleScheduler Loop"); break; } @@ -485,7 +487,7 @@ protected Void callInternal() throws Exception { } } else { if (LOG.isDebugEnabled()) { - LOG.debug(srcNameTrimmed + ": " + "Skipping host: " + + LOG.debug(sourceDestNameTrimmed + ": " + "Skipping host: " + inputHost.getIdentifier() + " since it has no inputs to process"); } @@ -497,7 +499,8 @@ protected Void callInternal() throws Exception { } } shufflePhaseTime.setValue(System.currentTimeMillis() - startTime); - LOG.info(srcNameTrimmed + ": " + "Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted()); + LOG.info(sourceDestNameTrimmed + ": " + "Shutting down FetchScheduler, Was Interrupted: " + + Thread.currentThread().isInterrupted()); if (!fetcherExecutor.isShutdown()) { fetcherExecutor.shutdownNow(); } @@ -540,16 +543,15 @@ Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) { if (sharedFetchEnabled) { // pick a single lock disk from the edge name's hashcode + host hashcode - final int h = Math.abs(Objects.hashCode(this.srcNameTrimmed, inputHost.getHost())); + final int h = Math.abs(Objects.hashCode(this.sourceDestNameTrimmed, inputHost.getHost())); lockDisk = new Path(this.localDisks[h % this.localDisks.length], "locks"); } FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this, - httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(), - jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, + httpConnectionParams, inputManager, inputContext, + jobTokenSecretMgr, conf, localFs, localDirAllocator, lockDisk, localDiskFetchEnabled, sharedFetchEnabled, - localhostName, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch, enableFetcherTestingErrors, - inputContext.getObjectRegistry()); + localhostName, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch, enableFetcherTestingErrors); if (codec != null) { fetcherBuilder.setCompressionParameters(codec); @@ -632,7 +634,7 @@ public void addKnownInput(String hostName, int port, } } if (LOG.isDebugEnabled()) { - LOG.debug(srcNameTrimmed + ": " + "Adding input: " + + LOG.debug(sourceDestNameTrimmed + ": " + "Adding input: " + srcAttemptIdentifier + ", to host: " + host); } @@ -878,7 +880,7 @@ private void adjustCompletedInputs(FetchedInput fetchedInput) { if (fetchedInput instanceof NullFetchedInput) { completedInputs.add(fetchedInput); } - LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName()); + LOG.info("All inputs fetched for input vertex : " + inputContext.getInputOutputVertexNames()); } } finally { lock.unlock(); @@ -950,7 +952,7 @@ public void fetchFailed(String host, LOG.info( "{}: Fetch failed for src: {} InputIdentifier: {}, connectFailed: {}, " + "local fetch: {}, remote fetch failure reported as local failure: {})", - srcNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, connectFailed, + sourceDestNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, connectFailed, inputAttemptFetchFailure.isLocalFetch(), inputAttemptFetchFailure.isDiskErrorAtSource()); failedShufflesCounter.increment(1); inputContext.notifyProgress(); @@ -988,11 +990,11 @@ public void shutdown() throws InterruptedException { if (Thread.currentThread().isInterrupted()) { //TODO: need to cleanup all FetchedInput (DiskFetchedInput, LocalDisFetchedInput), lockFile //As of now relying on job cleanup (when all directories would be cleared) - LOG.info(srcNameTrimmed + ": " + "Thread interrupted. Need to cleanup the local dirs"); + LOG.info(sourceDestNameTrimmed + ": " + "Thread interrupted. Need to cleanup the local dirs"); } if (!isShutdown.getAndSet(true)) { // Shut down any pending fetchers - LOG.info("Shutting down pending fetchers on source" + srcNameTrimmed + ": " + LOG.info("Shutting down pending fetchers on source" + sourceDestNameTrimmed + ": " + runningFetchers.size()); lock.lock(); try { @@ -1140,15 +1142,15 @@ private class SchedulerFutureCallback implements FutureCallback { @Override public void onSuccess(Void result) { - LOG.info(srcNameTrimmed + ": " + "Scheduler thread completed"); + LOG.info(sourceDestNameTrimmed + ": " + "Scheduler thread completed"); } @Override public void onFailure(Throwable t) { if (isShutdown.get()) { - LOG.debug("{}: Already shutdown. Ignoring error.", srcNameTrimmed, t); + LOG.debug("{}: Already shutdown. Ignoring error.", sourceDestNameTrimmed, t); } else { - LOG.error(srcNameTrimmed + ": " + "Scheduler failed with error: ", t); + LOG.error(sourceDestNameTrimmed + ": " + "Scheduler failed with error: ", t); inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Shuffle Scheduler Failed"); } } @@ -1177,7 +1179,7 @@ private void doBookKeepingForFetcherComplete() { public void onSuccess(FetchResult result) { fetcher.shutdown(); if (isShutdown.get()) { - LOG.debug("{}: Already shutdown. Ignoring event from fetcher", srcNameTrimmed); + LOG.debug("{}: Already shutdown. Ignoring event from fetcher", sourceDestNameTrimmed); } else { Iterable pendingInputs = result.getPendingInputs(); if (pendingInputs != null && pendingInputs.iterator().hasNext()) { @@ -1200,9 +1202,9 @@ public void onFailure(Throwable t) { // Unsuccessful - the fetcher may not have shutdown correctly. Try shutting it down. fetcher.shutdown(); if (isShutdown.get()) { - LOG.debug("{}: Already shutdown. Ignoring error from fetcher.", srcNameTrimmed, t); + LOG.debug("{}: Already shutdown. Ignoring error from fetcher.", sourceDestNameTrimmed, t); } else { - LOG.error(srcNameTrimmed + ": " + "Fetcher failed with error: ", t); + LOG.error(sourceDestNameTrimmed + ": " + "Fetcher failed with error: ", t); shuffleError = t; inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Fetch failed"); doBookKeepingForFetcherComplete(); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 2725debb56..5887dcb3ca 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -42,8 +42,10 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type; @@ -121,7 +123,6 @@ public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams, boolean localDiskFetchEnabled, String localHostname, int shufflePort, - String srcNameTrimmed, MapHost mapHost, TezCounter ioErrsCounter, TezCounter wrongLengthErrsCounter, @@ -129,12 +130,11 @@ public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams, TezCounter wrongMapErrsCounter, TezCounter connectionErrsCounter, TezCounter wrongReduceErrsCounter, - String applicationId, - int dagId, boolean asyncHttp, boolean sslShuffle, boolean verifyDiskChecksum, - boolean compositeFetch) { + boolean compositeFetch, + InputContext inputContext) { this.scheduler = scheduler; this.allocator = allocator; this.exceptionReporter = exceptionReporter; @@ -149,8 +149,8 @@ public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams, this.badIdErrs = badIdErrsCounter; this.connectionErrs = connectionErrsCounter; this.wrongReduceErrs = wrongReduceErrsCounter; - this.applicationId = applicationId; - this.dagId = dagId; + this.applicationId = inputContext.getApplicationId().toString(); + this.dagId = inputContext.getDagIdentifier(); this.ifileReadAhead = ifileReadAhead; this.ifileReadAheadLength = ifileReadAheadLength; @@ -171,7 +171,9 @@ public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams, this.verifyDiskChecksum = verifyDiskChecksum; this.compositeFetch = compositeFetch; - this.logIdentifier = "fetcher [" + srcNameTrimmed + "] #" + id; + String sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + " -> " + + TezUtilsInternal.cleanVertexName(inputContext.getTaskVertexName()); + this.logIdentifier = "fetcher [" + sourceDestNameTrimmed + "] #" + id; } @VisibleForTesting diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGroupedWithInjectableErrors.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGroupedWithInjectableErrors.java index 94172d12b9..9c782f6585 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGroupedWithInjectableErrors.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGroupedWithInjectableErrors.java @@ -19,10 +19,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.http.HttpConnectionParams; -import org.apache.tez.runtime.api.ObjectRegistry; +import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.shuffle.FetcherErrorTestingConfig; import org.slf4j.Logger; @@ -38,17 +39,16 @@ public FetcherOrderedGroupedWithInjectableErrors(HttpConnectionParams httpConnec ShuffleScheduler scheduler, FetchedInputAllocatorOrderedGrouped allocator, ExceptionReporter exceptionReporter, JobTokenSecretManager jobTokenSecretMgr, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, Configuration conf, RawLocalFileSystem localFs, boolean localDiskFetchEnabled, String localHostname, - int shufflePort, String srcNameTrimmed, MapHost mapHost, TezCounter ioErrsCounter, + int shufflePort, MapHost mapHost, TezCounter ioErrsCounter, TezCounter wrongLengthErrsCounter, TezCounter badIdErrsCounter, TezCounter wrongMapErrsCounter, - TezCounter connectionErrsCounter, TezCounter wrongReduceErrsCounter, String applicationId, int dagId, - boolean asyncHttp, boolean sslShuffle, boolean verifyDiskChecksum, boolean compositeFetch, - ObjectRegistry objectRegistry) { + TezCounter connectionErrsCounter, TezCounter wrongReduceErrsCounter, boolean asyncHttp, + boolean sslShuffle, boolean verifyDiskChecksum, boolean compositeFetch, InputContext inputContext) { super(httpConnectionParams, scheduler, allocator, exceptionReporter, jobTokenSecretMgr, ifileReadAhead, - ifileReadAheadLength, codec, conf, localFs, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, + ifileReadAheadLength, codec, conf, localFs, localDiskFetchEnabled, localHostname, shufflePort, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, - wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle, verifyDiskChecksum, compositeFetch); - this.fetcherErrorTestingConfig = new FetcherErrorTestingConfig(conf, objectRegistry); - this.srcNameTrimmed = srcNameTrimmed; + wrongReduceErrsCounter, asyncHttp, sslShuffle, verifyDiskChecksum, compositeFetch, inputContext); + this.fetcherErrorTestingConfig = new FetcherErrorTestingConfig(conf, inputContext.getObjectRegistry()); + this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()); LOG.info("Initialized FetcherOrderedGroupedWithInjectableErrors with config: {}", fetcherErrorTestingConfig); } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 46360e1287..8d3f37dbcd 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -250,7 +250,7 @@ public MergeManager(Configuration conf, if (LOG.isDebugEnabled()) { LOG.debug( - inputContext.getSourceVertexName() + ": " + "InitialRequest: ShuffleMem=" + memLimit + + inputContext.getInputOutputVertexNames() + ": " + "InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + ". Updated to: ShuffleMem=" @@ -285,7 +285,7 @@ public MergeManager(Configuration conf, conf.getFloat( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT)); - LOG.info(inputContext.getSourceVertexName() + ": MergerManager: memoryLimit=" + memoryLimit + ", " + + LOG.info(inputContext.getInputOutputVertexNames() + ": MergerManager: memoryLimit=" + memoryLimit + ", " + "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " + "mergeThreshold=" + mergeThreshold + ", " + "ioSortFactor=" + ioSortFactor + ", " + @@ -549,8 +549,9 @@ private void trackAndLogCloseInMemoryFile(MapOutput mapOutput) { private void startMemToDiskMerge() { synchronized (inMemoryMerger) { if (!inMemoryMerger.isInProgress()) { - LOG.info(inputContext.getSourceVertexName() + ": " + "Starting inMemoryMerger's merge since commitMemory=" + - commitMemory + " > mergeThreshold=" + mergeThreshold + + LOG.info(inputContext.getInputOutputVertexNames() + ": " + + "Starting inMemoryMerger's merge since commitMemory=" + commitMemory + + " > mergeThreshold=" + mergeThreshold + ". Current usedMemory=" + usedMemory); inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear(); @@ -788,8 +789,8 @@ public void merge(List inputs) throws IOException, InterruptedExcepti Writer writer = new InMemoryWriter(mergedMapOutputs.getMemory()); - LOG.info(inputContext.getSourceVertexName() + ": " + "Initiating Memory-to-Memory merge with " + noInMemorySegments + - " segments of total-size: " + mergeOutputSize); + LOG.info(inputContext.getInputOutputVertexNames() + ": " + "Initiating Memory-to-Memory merge with " + + noInMemorySegments + " segments of total-size: " + mergeOutputSize); if (Thread.currentThread().isInterrupted()) { return; // early exit @@ -808,7 +809,7 @@ public void merge(List inputs) throws IOException, InterruptedExcepti TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT); writer.close(); - LOG.info(inputContext.getSourceVertexName() + + LOG.info(inputContext.getInputOutputVertexNames() + " Memory-to-Memory merge of the " + noInMemorySegments + " files in-memory complete with mergeOutputSize=" + mergeOutputSize); @@ -1051,7 +1052,7 @@ public void merge(List inputs) throws IOException, InterruptedExcepti final long outputLen = localFS.getFileStatus(outputPath).getLen(); closeOnDiskFile(new FileChunk(outputPath, 0, outputLen)); - LOG.info(inputContext.getSourceVertexName() + + LOG.info(inputContext.getInputOutputVertexNames() + " Finished merging " + inputs.size() + " map output files on disk of total-size " + approxOutputSize + "." + diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index db5ef734de..2b99739a58 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -93,7 +93,7 @@ public class Shuffle implements ExceptionReporter { private volatile ListenableFuture runShuffleFuture; private final ListeningExecutorService executor; - private final String srcNameTrimmed; + private final String sourceDestNameTrimmed; private AtomicBoolean isShutDown = new AtomicBoolean(false); private AtomicBoolean fetchersClosed = new AtomicBoolean(false); @@ -109,7 +109,8 @@ public Shuffle(InputContext inputContext, Configuration conf, int numInputs, this.inputContext = inputContext; this.conf = conf; - this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()); + this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + " -> " + + TezUtilsInternal.cleanVertexName(inputContext.getTaskVertexName()); this.codec = CodecUtils.getCodec(conf); @@ -138,7 +139,7 @@ public Shuffle(InputContext inputContext, Configuration conf, int numInputs, TezCounter mergedMapOutputsCounter = inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS); - LOG.info(srcNameTrimmed + ": " + "Shuffle assigned with " + numInputs + " inputs" + ", codec: " + LOG.info(sourceDestNameTrimmed + ": " + "Shuffle assigned with " + numInputs + " inputs" + ", codec: " + (codec == null ? "None" : codec.getClass().getName()) + ", ifileReadAhead: " + ifileReadAhead); @@ -169,7 +170,7 @@ public Shuffle(InputContext inputContext, Configuration conf, int numInputs, codec, ifileReadAhead, ifileReadAheadLength, - srcNameTrimmed); + sourceDestNameTrimmed); this.mergePhaseTime = inputContext.getCounters().findCounter(TaskCounter.MERGE_PHASE_TIME); this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME); @@ -182,7 +183,7 @@ public Shuffle(InputContext inputContext, Configuration conf, int numInputs, ShuffleUtils.isTezShuffleHandler(conf)); ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build()); + .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + sourceDestNameTrimmed + "}").build()); executor = MoreExecutors.listeningDecorator(rawExecutor); @@ -193,7 +194,7 @@ public void handleEvents(List events) throws IOException { if (!isShutDown.get()) { eventHandler.handleEvents(events); } else { - LOG.info(srcNameTrimmed + ": " + "Ignoring events since already shutdown. EventCount: " + events.size()); + LOG.info(sourceDestNameTrimmed + ": " + "Ignoring events since already shutdown. EventCount: " + events.size()); } } @@ -267,7 +268,7 @@ public void run() throws IOException { public void shutdown() { if (!isShutDown.getAndSet(true)) { // Interrupt so that the scheduler / merger sees this interrupt. - LOG.info("Shutting down Shuffle for source: " + srcNameTrimmed); + LOG.info("Shutting down Shuffle for source: " + sourceDestNameTrimmed); runShuffleFuture.cancel(true); cleanupIgnoreErrors(); } @@ -323,7 +324,7 @@ protected TezRawKeyValueIterator callInternal() throws IOException, InterruptedE } inputContext.inputIsReady(); - LOG.info("merge complete for input vertex : " + srcNameTrimmed); + LOG.info("merge complete for input vertex : " + sourceDestNameTrimmed); return kvIter; } } @@ -333,7 +334,8 @@ private void cleanupShuffleSchedulerIgnoreErrors() { cleanupShuffleScheduler(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOG.info(srcNameTrimmed + ": " + "Interrupted while attempting to close the scheduler during cleanup. Ignoring"); + LOG.info(sourceDestNameTrimmed + ": " + + "Interrupted while attempting to close the scheduler during cleanup. Ignoring"); } } @@ -351,13 +353,14 @@ private void cleanupMerger(boolean ignoreErrors) throws Throwable { if (ignoreErrors) { //Reset the status Thread.currentThread().interrupt(); - LOG.info(srcNameTrimmed + ": " + "Interrupted while attempting to close the merger during cleanup. Ignoring"); + LOG.info(sourceDestNameTrimmed + ": " + + "Interrupted while attempting to close the merger during cleanup. Ignoring"); } else { throw e; } } catch (Throwable e) { if (ignoreErrors) { - LOG.info(srcNameTrimmed + ": " + "Exception while trying to shutdown merger, Ignoring", e); + LOG.info(sourceDestNameTrimmed + ": " + "Exception while trying to shutdown merger, Ignoring", e); } else { throw e; } @@ -379,7 +382,7 @@ private void cleanupIgnoreErrors() { } cleanupMerger(true); } catch (Throwable t) { - LOG.info(srcNameTrimmed + ": " + "Error in cleaning up.., ", t); + LOG.info(sourceDestNameTrimmed + ": " + "Error in cleaning up.., ", t); } } @@ -388,7 +391,7 @@ private void cleanupIgnoreErrors() { public synchronized void reportException(Throwable t) { // RunShuffleCallable onFailure deals with ignoring errors on shutdown. if (throwable.get() == null) { - LOG.info(srcNameTrimmed + ": " + "Setting throwable in reportException with message [" + t.getMessage() + + LOG.info(sourceDestNameTrimmed + ": " + "Setting throwable in reportException with message [" + t.getMessage() + "] from thread [" + Thread.currentThread().getName()); throwable.set(t); throwingThreadName = Thread.currentThread().getName(); @@ -423,15 +426,15 @@ public static long getInitialMemoryRequirement(Configuration conf, long maxAvail private class ShuffleRunnerFutureCallback implements FutureCallback { @Override public void onSuccess(TezRawKeyValueIterator result) { - LOG.info(srcNameTrimmed + ": " + "Shuffle Runner thread complete"); + LOG.info(sourceDestNameTrimmed + ": " + "Shuffle Runner thread complete"); } @Override public void onFailure(Throwable t) { if (isShutDown.get()) { - LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring error"); + LOG.info(sourceDestNameTrimmed + ": " + "Already shutdown. Ignoring error"); } else { - LOG.error(srcNameTrimmed + ": " + "ShuffleRunner failed with error", t); + LOG.error(sourceDestNameTrimmed + ": " + "ShuffleRunner failed with error", t); // In case of an abort / Interrupt - the runtime makes sure that this is ignored. inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Shuffle Runner Failed"); cleanupIgnoreErrors(); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index c97cfdf3ce..9984c5af10 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -77,7 +77,7 @@ public void handleEvents(List events) throws IOException { @Override public void logProgress(boolean updateOnClose) { - LOG.info(inputContext.getSourceVertexName() + ": " + LOG.info(inputContext.getInputOutputVertexNames() + ": " + "numDmeEventsSeen=" + numDmeEvents.get() + ", numDmeEventsSeenWithNoData=" + numDmeEventsNoData.get() + ", numObsoletionEventsSeen=" + numObsoletionEvents.get() diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index cd4b14ca1b..470b04cc5f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -222,8 +222,6 @@ enum ShuffleErrors { private final boolean localDiskFetchEnabled; private final String localHostname; private final int shufflePort; - private final String applicationId; - private final int dagId; private final boolean asyncHttp; private final boolean sslShuffle; @@ -338,8 +336,6 @@ public ShuffleScheduler(InputContext inputContext, TezRuntimeConfiguration .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT); - this.applicationId = inputContext.getApplicationId().toString(); - this.dagId = inputContext.getDagIdentifier(); this.localHostname = inputContext.getExecutionContext().getHostName(); String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); @@ -673,7 +669,7 @@ public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifi if (remainingMaps.get() == 0) { notifyAll(); // Notify the getHost() method. - LOG.info("All inputs fetched for input vertex : " + inputContext.getSourceVertexName()); + LOG.info("All inputs fetched for input vertex : " + inputContext.getInputOutputVertexNames()); } // update the status @@ -1474,17 +1470,17 @@ FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) { if (enableFetcherTestingErrors) { return new FetcherOrderedGroupedWithInjectableErrors(httpConnectionParams, ShuffleScheduler.this, allocator, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, - codec, conf, localFs, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost, + codec, conf, localFs, localDiskFetchEnabled, localHostname, shufflePort, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, - connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle, - verifyDiskChecksum, compositeFetch, inputContext.getObjectRegistry()); + connectionErrsCounter, wrongReduceErrsCounter, asyncHttp, sslShuffle, + verifyDiskChecksum, compositeFetch, inputContext); } else { return new FetcherOrderedGrouped(httpConnectionParams, ShuffleScheduler.this, allocator, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, - codec, conf, localFs, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost, + codec, conf, localFs, localDiskFetchEnabled, localHostname, shufflePort, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, - connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle, - verifyDiskChecksum, compositeFetch); + connectionErrsCounter, wrongReduceErrsCounter, asyncHttp, sslShuffle, + verifyDiskChecksum, compositeFetch, inputContext); } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index 313c13d188..1463cfabbe 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -104,7 +104,7 @@ public synchronized List initialize() throws IOException { isStarted.set(true); getContext().inputIsReady(); LOG.info("input fetch not required since there are 0 physical inputs for input vertex: " - + getContext().getSourceVertexName()); + + getContext().getInputOutputVertexNames()); return Collections.emptyList(); } @@ -305,7 +305,7 @@ protected synchronized void createValuesIterator() RawComparator rawComparator = ConfigUtils.getIntermediateInputKeyComparator(conf); Class keyClass = ConfigUtils.getIntermediateInputKeyClass(conf); Class valClass = ConfigUtils.getIntermediateInputValueClass(conf); - LOG.info(getContext().getSourceVertexName() + ": " + "creating ValuesIterator with " + LOG.info(getContext().getInputOutputVertexNames() + ": " + "creating ValuesIterator with " + "comparator=" + rawComparator.getClass().getName() + ", keyClass=" + keyClass.getName() + ", valClass=" + valClass.getName()); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index c67c405b43..38d5295094 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -92,7 +92,7 @@ public synchronized List initialize() throws Exception { isStarted.set(true); getContext().inputIsReady(); LOG.info("input fetch not required since there are 0 physical inputs for input vertex: " - + getContext().getSourceVertexName()); + + getContext().getInputOutputVertexNames()); return Collections.emptyList(); } else { long initalMemReq = getInitialMemoryReq(); @@ -148,7 +148,7 @@ public synchronized void start() throws IOException { pendingEvents.drainTo(pending); if (pending.size() > 0) { if (LOG.isDebugEnabled()) { - LOG.debug(getContext().getSourceVertexName() + ": " + "NoAutoStart delay in processing first event: " + LOG.debug(getContext().getInputOutputVertexNames() + ": " + "NoAutoStart delay in processing first event: " + (System.currentTimeMillis() - firstEventReceivedTime)); } inputEventHandler.handleEvents(pending); diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java index cea3272818..338f640f91 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java @@ -81,7 +81,7 @@ public void testLocalFetchModeSetting() throws Exception { final boolean DISABLE_LOCAL_FETCH = false; Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, + createMockInputContext(), null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, false, true, false); builder.assignWork(HOST, PORT, 0, 1, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build()); @@ -100,7 +100,7 @@ public void testLocalFetchModeSetting() throws Exception { // when enabled and hostname does not match use http fetch. builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, + createMockInputContext(), null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, false, true, false); builder.assignWork(HOST + "_OTHER", PORT, 0, 1, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -116,7 +116,7 @@ public void testLocalFetchModeSetting() throws Exception { // when enabled and port does not match use http fetch. builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, + createMockInputContext(), null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, false, true, false); builder.assignWork(HOST, PORT + 1, 0, 1, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -133,7 +133,7 @@ public void testLocalFetchModeSetting() throws Exception { // When disabled use http fetch conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST, + createMockInputContext(), null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, false, true, false); builder.assignWork(HOST, PORT, 0, 1, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -167,7 +167,7 @@ public void testSetupLocalDiskFetch() throws Exception { int partition = 42; FetcherCallback callback = mock(FetcherCallback.class); Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, - ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, + createMockInputContext(), null, conf, true, HOST, PORT, false, true, true); ArrayList inputAttemptIdentifiers = new ArrayList<>(); for(CompositeInputAttemptIdentifier compositeInputAttemptIdentifier : srcAttempts) { @@ -306,7 +306,7 @@ public void testInputAttemptIdentifierMap() { int partition = 42; FetcherCallback callback = mock(FetcherCallback.class); Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, - ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, + createMockInputContext(), null, conf, true, HOST, PORT, false, true, false); builder.assignWork(HOST, PORT, partition, 1, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build()); @@ -330,7 +330,7 @@ public void testShuffleHandlerDiskErrorUnordered() doReturn("vertex").when(inputContext).getSourceVertexName(); Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(mock(ShuffleManager.class), null, - null, ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, + null, createMockInputContext(), null, conf, true, HOST, PORT, false, true, false); builder.assignWork(HOST, PORT, 0, 1, Arrays.asList(new InputAttemptIdentifier(0, 0))); @@ -345,4 +345,15 @@ public void testShuffleHandlerDiskErrorUnordered() Assert.assertTrue(failures[0].isDiskErrorAtSource()); Assert.assertFalse(failures[0].isLocalFetch()); } + + private InputContext createMockInputContext() { + InputContext inputContext = mock(InputContext.class); + + doReturn(ApplicationId.newInstance(0, 1)).when(inputContext).getApplicationId(); + doReturn(1).when(inputContext).getDagIdentifier(); + doReturn("sourceVertex").when(inputContext).getSourceVertexName(); + doReturn("taskVertex").when(inputContext).getTaskVertexName(); + + return inputContext; + } } diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java index 683422b1a7..8a774dc1a5 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java @@ -189,6 +189,7 @@ private InputContext createInputContext() throws IOException { InputContext inputContext = mock(InputContext.class); doReturn(new TezCounters()).when(inputContext).getCounters(); doReturn("sourceVertex").when(inputContext).getSourceVertexName(); + doReturn("taskVertex").when(inputContext).getTaskVertexName(); doReturn(shuffleMetaData).when(inputContext) .getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java index 041fd03854..fda2c896d1 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java @@ -169,6 +169,7 @@ private InputContext createInputContext() throws IOException { InputContext inputContext = mock(InputContext.class); doReturn(new TezCounters()).when(inputContext).getCounters(); doReturn("sourceVertex").when(inputContext).getSourceVertexName(); + doReturn("taskVertex").when(inputContext).getTaskVertexName(); doReturn(shuffleMetaData).when(inputContext) .getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT)); diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index 028fbce96a..f283780c12 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -90,7 +90,7 @@ public class TestFetcher { public static final String HOST = "localhost"; public static final int PORT = 65; public static final int DAG_ID = 1; - public static final String APP_ID = "application_1234_1"; + public static final ApplicationId APP_ID = ApplicationId.newInstance(0, 1); private TezCounters tezCounters = new TezCounters(); private TezCounter ioErrsCounter = tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME, @@ -121,10 +121,6 @@ public void testInputsReturnedOnConnectionException() throws Exception { Shuffle shuffle = mock(Shuffle.class); - InputContext inputContext = mock(InputContext.class); - doReturn(new TezCounters()).when(inputContext).getCounters(); - doReturn("src vertex").when(inputContext).getSourceVertexName(); - MapHost mapHost = new MapHost(HOST, PORT, 0, 1); InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt"); mapHost.addKnownMap(inputAttemptIdentifier); @@ -133,9 +129,9 @@ public void testInputsReturnedOnConnectionException() throws Exception { FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, - getRawFs(conf), false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, + getRawFs(conf), false, HOST, PORT, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, - wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false); + wrongReduceErrsCounter, false, false, true, false, createMockInputContext()); fetcher.call(); verify(scheduler).getMapsForHost(mapHost); @@ -151,18 +147,14 @@ public void testLocalFetchModeSetting1() throws Exception { MergeManager merger = mock(MergeManager.class); Shuffle shuffle = mock(Shuffle.class); - InputContext inputContext = mock(InputContext.class); - doReturn(new TezCounters()).when(inputContext).getCounters(); - doReturn("src vertex").when(inputContext).getSourceVertexName(); - final boolean ENABLE_LOCAL_FETCH = true; final boolean DISABLE_LOCAL_FETCH = false; MapHost mapHost = new MapHost(HOST, PORT, 0, 1); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, - getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, + getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, - wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false); + wrongReduceErrsCounter, false, false, true, false, createMockInputContext()); // when local mode is enabled and host and port matches use local fetch FetcherOrderedGrouped spyFetcher = spy(fetcher); @@ -177,9 +169,9 @@ public void testLocalFetchModeSetting1() throws Exception { mapHost = new MapHost(HOST + "_OTHER", PORT, 0, 1); fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, - getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, + getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, - wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false); + wrongReduceErrsCounter, false, false, true, false, createMockInputContext()); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); @@ -192,9 +184,9 @@ public void testLocalFetchModeSetting1() throws Exception { mapHost = new MapHost(HOST, PORT + 1, 0, 1); fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, - getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, + getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, - wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false); + wrongReduceErrsCounter, false, false, true, false, createMockInputContext()); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); @@ -206,9 +198,9 @@ public void testLocalFetchModeSetting1() throws Exception { //if local fetch is not enabled mapHost = new MapHost(HOST, PORT, 0, 1); fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, - conf, getRawFs(conf), DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, + conf, getRawFs(conf), DISABLE_LOCAL_FETCH, HOST, PORT, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, - wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false); + wrongReduceErrsCounter, false, false, true, false, createMockInputContext()); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); @@ -224,15 +216,12 @@ public void testSetupLocalDiskFetch() throws Exception { ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); Shuffle shuffle = mock(Shuffle.class); - InputContext inputContext = mock(InputContext.class); - when(inputContext.getCounters()).thenReturn(new TezCounters()); - when(inputContext.getSourceVertexName()).thenReturn(""); MapHost host = new MapHost(HOST, PORT, 1, 1); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, - null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, "src vertex", host, + null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, - connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false); + connectionErrsCounter, wrongReduceErrsCounter, false, false, true, false, createMockInputContext()); FetcherOrderedGrouped spyFetcher = spy(fetcher); @@ -338,15 +327,12 @@ public void testSetupLocalDiskFetchEmptyPartitions() throws Exception { ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); Shuffle shuffle = mock(Shuffle.class); - InputContext inputContext = mock(InputContext.class); - when(inputContext.getCounters()).thenReturn(new TezCounters()); - when(inputContext.getSourceVertexName()).thenReturn(""); MapHost host = new MapHost(HOST, PORT, 1, 1); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, - null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, "src vertex", host, + null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, - connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false); + connectionErrsCounter, wrongReduceErrsCounter, false, false, true, false, createMockInputContext()); FetcherOrderedGrouped spyFetcher = spy(fetcher); final List srcAttempts = Arrays.asList( @@ -413,15 +399,12 @@ public void testSetupLocalDiskFetchAutoReduce() throws Exception { ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); Shuffle shuffle = mock(Shuffle.class); - InputContext inputContext = mock(InputContext.class); - when(inputContext.getCounters()).thenReturn(new TezCounters()); - when(inputContext.getSourceVertexName()).thenReturn(""); MapHost host = new MapHost(HOST, PORT, 1, 2); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, - null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, "src vertex", host, + null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, - connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false); + connectionErrsCounter, wrongReduceErrsCounter, false, false, true, false, createMockInputContext()); FetcherOrderedGrouped spyFetcher = spy(fetcher); @@ -593,17 +576,12 @@ public void testWithRetry() throws Exception { MergeManager merger = mock(MergeManager.class); Shuffle shuffle = mock(Shuffle.class); - InputContext inputContext = mock(InputContext.class); - when(inputContext.getCounters()).thenReturn(new TezCounters()); - when(inputContext.getSourceVertexName()).thenReturn(""); - when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1)); - HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); final MapHost host = new MapHost(HOST, PORT, 1, 1); FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, - null, false, 0, null, conf, getRawFs(conf), false, HOST, PORT, "src vertex", host, + null, false, 0, null, conf, getRawFs(conf), false, HOST, PORT, host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, - connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false); + connectionErrsCounter, wrongReduceErrsCounter, false, false, true, false, createMockInputContext()); final FetcherOrderedGrouped fetcher = spy(mockFetcher); @@ -676,11 +654,6 @@ public void testAsyncWithException() throws Exception { MergeManager merger = mock(MergeManager.class); Shuffle shuffle = mock(Shuffle.class); - TezCounters counters = new TezCounters(); - InputContext inputContext = mock(InputContext.class); - when(inputContext.getCounters()).thenReturn(counters); - when(inputContext.getSourceVertexName()).thenReturn(""); - JobTokenSecretManager jobMgr = mock(JobTokenSecretManager.class); doReturn(new byte[10]).when(jobMgr).computeHash(any(byte[].class)); @@ -688,9 +661,9 @@ public void testAsyncWithException() throws Exception { final MapHost host = new MapHost(HOST, PORT, 1, 1); FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, shuffle, jobMgr, false, - 0, null, conf, getRawFs(conf), false, HOST, PORT, "src vertex", host, ioErrsCounter, + 0, null, conf, getRawFs(conf), false, HOST, PORT, host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, - wrongReduceErrsCounter, APP_ID, DAG_ID, true, false, true, false); + wrongReduceErrsCounter, true, false, true, false, createMockInputContext()); final FetcherOrderedGrouped fetcher = spy(mockFetcher); fetcher.remaining = new LinkedHashMap(); final List srcAttempts = Arrays.asList( @@ -753,9 +726,9 @@ public void testInputAttemptIdentifierMap() { Shuffle shuffle = mock(Shuffle.class); MapHost mapHost = new MapHost(HOST, PORT, 0, 1); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, - null, false, 0, null, conf, getRawFs(conf), false, HOST, PORT, "src vertex", mapHost, + null, false, 0, null, conf, getRawFs(conf), false, HOST, PORT, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, - connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false); + connectionErrsCounter, wrongReduceErrsCounter, false, false, true, false, createMockInputContext()); fetcher.populateRemainingMap(new LinkedList(Arrays.asList(srcAttempts))); Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size()); Iterator> iterator = fetcher.remaining.entrySet().iterator(); @@ -773,9 +746,9 @@ public void testShuffleHandlerDiskErrorOrdered() InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt"); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, null, null, null, null, false, - 0, null, new TezConfiguration(), null, false, HOST, PORT, "src vertex", mapHost, + 0, null, new TezConfiguration(), null, false, HOST, PORT, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, - connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false); + connectionErrsCounter, wrongReduceErrsCounter, false, false, true, false, createMockInputContext()); fetcher.remaining = new HashMap(); ShuffleHeader header = @@ -800,4 +773,17 @@ private RawLocalFileSystem getRawFs(Configuration conf) { throw new RuntimeException(e); } } + + + private InputContext createMockInputContext() { + InputContext inputContext = mock(InputContext.class); + + doReturn(APP_ID).when(inputContext).getApplicationId(); + doReturn(DAG_ID).when(inputContext).getDagIdentifier(); + doReturn(new TezCounters()).when(inputContext).getCounters(); + doReturn("src vertex").when(inputContext).getSourceVertexName(); + doReturn("task_Vertex").when(inputContext).getTaskVertexName(); + + return inputContext; + } } diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java index a28b1fa1a5..0a9c37e9ea 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java @@ -144,6 +144,7 @@ private InputContext createTezInputContext() throws IOException { InputContext inputContext = mock(InputContext.class); doReturn(applicationId).when(inputContext).getApplicationId(); doReturn("sourceVertex").when(inputContext).getSourceVertexName(); + doReturn("taskVertex").when(inputContext).getTaskVertexName(); when(inputContext.getCounters()).thenReturn(new TezCounters()); ExecutionContext executionContext = new ExecutionContextImpl("localhost"); doReturn(executionContext).when(inputContext).getExecutionContext(); diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java index 56bfe49dc8..36bb983d1d 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java @@ -352,7 +352,8 @@ public void handleEvents(List inputEvents) throws Exception { if (event instanceof DataMovementEvent) { DataMovementEvent dmEvent = (DataMovementEvent) event; numCompletedInputs++; - LOG.info(getContext().getSourceVertexName() + " Received DataMovement event sourceId : " + dmEvent.getSourceIndex() + + LOG.info(getContext().getInputOutputVertexNames() + + " Received DataMovement event sourceId : " + dmEvent.getSourceIndex() + " targetId: " + dmEvent.getTargetIndex() + " version: " + dmEvent.getVersion() + " numInputs: " + getNumPhysicalInputs() +