Skip to content

Commit

Permalink
TEZ-4340: replace srcNameTrimmed in ShuffleManager and ShuffleScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
csjuhasz-c committed Jan 12, 2022
1 parent 0200dd9 commit ba385cf
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public class ShuffleManager implements FetcherCallback {
*/
private final int maxTimeToWaitForReportMillis;

private final String srcNameTrimmed;
private final String sourceDestNameTrimmed;

private final int maxTaskOutputAtOnce;

Expand Down Expand Up @@ -264,8 +264,9 @@ public ShuffleManager(InputContext inputContext, Configuration conf, int numInpu
conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS_DEFAULT);

this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());

this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + " -> "
+ TezUtilsInternal.cleanVertexName(inputContext.getTaskVertexName());

completedInputSet = new BitSet(numInputs);
/**
* In case of pipelined shuffle, it is possible to get multiple FetchedInput per attempt.
Expand All @@ -288,15 +289,15 @@ public ShuffleManager(InputContext inputContext, Configuration conf, int numInpu
if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL_DEFAULT)) {
fetcherRawExecutor = inputContext.createTezFrameworkExecutorService(numFetchers,
"Fetcher_B {" + srcNameTrimmed + "} #%d");
"Fetcher_B {" + sourceDestNameTrimmed + "} #%d");
} else {
fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("Fetcher_B {" + srcNameTrimmed + "} #%d").build());
.setDaemon(true).setNameFormat("Fetcher_B {" + sourceDestNameTrimmed + "} #%d").build());
}
this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);

ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}").build());
.setDaemon(true).setNameFormat("ShuffleRunner {" + sourceDestNameTrimmed + "}").build());
this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
this.schedulerCallable = new RunShuffleCallable(conf);

Expand Down Expand Up @@ -336,7 +337,7 @@ public ShuffleManager(InputContext inputContext, Configuration conf, int numInpu

shuffleInfoEventsMap = new ConcurrentHashMap<Integer, ShuffleEventInfo>();

LOG.info(srcNameTrimmed + ": numInputs=" + numInputs + ", compressionCodec="
LOG.info(sourceDestNameTrimmed + ": numInputs=" + numInputs + ", compressionCodec="
+ (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
+ numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
+ ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", "
Expand All @@ -361,7 +362,7 @@ public void run() throws IOException {
if (maxTimeToWaitForReportMillis > 0) {
reporterExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}")
.setNameFormat("ShuffleRunner {" + sourceDestNameTrimmed + "}")
.build());
Future reporterFuture = reporterExecutor.submit(new ReporterCallable());
}
Expand Down Expand Up @@ -446,7 +447,7 @@ protected Void callInternal() throws Exception {
break;
}

LOG.debug("{}: NumCompletedInputs: {}", srcNameTrimmed, numCompletedInputs);
LOG.debug("{}: NumCompletedInputs: {}", sourceDestNameTrimmed, numCompletedInputs);
if (numCompletedInputs.get() < numInputs && !isShutdown.get()) {
lock.lock();
try {
Expand All @@ -458,22 +459,22 @@ protected Void callInternal() throws Exception {
inputHost = pendingHosts.take();
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(srcNameTrimmed + ": " + "Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
LOG.info(sourceDestNameTrimmed + ": " + "Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " +
LOG.debug(sourceDestNameTrimmed + ": " + "Processing pending host: " +
inputHost.toDetailedString());
}
if (inputHost.getNumPendingPartitions() > 0 && !isShutdown.get()) {
Fetcher fetcher = constructFetcherForHost(inputHost, conf);
runningFetchers.add(fetcher);
if (isShutdown.get()) {
LOG.info(srcNameTrimmed + ": " + "hasBeenShutdown," +
LOG.info(sourceDestNameTrimmed + ": " + "hasBeenShutdown," +
"Breaking out of ShuffleScheduler Loop");
break;
}
Expand All @@ -485,7 +486,7 @@ protected Void callInternal() throws Exception {
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(srcNameTrimmed + ": " + "Skipping host: " +
LOG.debug(sourceDestNameTrimmed + ": " + "Skipping host: " +
inputHost.getIdentifier() +
" since it has no inputs to process");
}
Expand All @@ -497,7 +498,7 @@ protected Void callInternal() throws Exception {
}
}
shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
LOG.info(srcNameTrimmed + ": " + "Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
LOG.info(sourceDestNameTrimmed + ": " + "Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
if (!fetcherExecutor.isShutdown()) {
fetcherExecutor.shutdownNow();
}
Expand Down Expand Up @@ -540,13 +541,13 @@ Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {

if (sharedFetchEnabled) {
// pick a single lock disk from the edge name's hashcode + host hashcode
final int h = Math.abs(Objects.hashCode(this.srcNameTrimmed, inputHost.getHost()));
final int h = Math.abs(Objects.hashCode(this.sourceDestNameTrimmed, inputHost.getHost()));
lockDisk = new Path(this.localDisks[h % this.localDisks.length], "locks");
}

FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(),
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
jobTokenSecretMgr, sourceDestNameTrimmed, conf, localFs, localDirAllocator,
lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
localhostName, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch, enableFetcherTestingErrors,
inputContext.getObjectRegistry());
Expand Down Expand Up @@ -632,7 +633,7 @@ public void addKnownInput(String hostName, int port,
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(srcNameTrimmed + ": " + "Adding input: " +
LOG.debug(sourceDestNameTrimmed + ": " + "Adding input: " +
srcAttemptIdentifier + ", to host: " + host);
}

Expand Down Expand Up @@ -950,7 +951,7 @@ public void fetchFailed(String host,
LOG.info(
"{}: Fetch failed for src: {} InputIdentifier: {}, connectFailed: {}, "
+ "local fetch: {}, remote fetch failure reported as local failure: {})",
srcNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, connectFailed,
sourceDestNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, connectFailed,
inputAttemptFetchFailure.isLocalFetch(), inputAttemptFetchFailure.isDiskErrorAtSource());
failedShufflesCounter.increment(1);
inputContext.notifyProgress();
Expand Down Expand Up @@ -988,11 +989,11 @@ public void shutdown() throws InterruptedException {
if (Thread.currentThread().isInterrupted()) {
//TODO: need to cleanup all FetchedInput (DiskFetchedInput, LocalDisFetchedInput), lockFile
//As of now relying on job cleanup (when all directories would be cleared)
LOG.info(srcNameTrimmed + ": " + "Thread interrupted. Need to cleanup the local dirs");
LOG.info(sourceDestNameTrimmed + ": " + "Thread interrupted. Need to cleanup the local dirs");
}
if (!isShutdown.getAndSet(true)) {
// Shut down any pending fetchers
LOG.info("Shutting down pending fetchers on source" + srcNameTrimmed + ": "
LOG.info("Shutting down pending fetchers on source" + sourceDestNameTrimmed + ": "
+ runningFetchers.size());
lock.lock();
try {
Expand Down Expand Up @@ -1140,15 +1141,15 @@ private class SchedulerFutureCallback implements FutureCallback<Void> {

@Override
public void onSuccess(Void result) {
LOG.info(srcNameTrimmed + ": " + "Scheduler thread completed");
LOG.info(sourceDestNameTrimmed + ": " + "Scheduler thread completed");
}

@Override
public void onFailure(Throwable t) {
if (isShutdown.get()) {
LOG.debug("{}: Already shutdown. Ignoring error.", srcNameTrimmed, t);
LOG.debug("{}: Already shutdown. Ignoring error.", sourceDestNameTrimmed, t);
} else {
LOG.error(srcNameTrimmed + ": " + "Scheduler failed with error: ", t);
LOG.error(sourceDestNameTrimmed + ": " + "Scheduler failed with error: ", t);
inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Shuffle Scheduler Failed");
}
}
Expand Down Expand Up @@ -1177,7 +1178,7 @@ private void doBookKeepingForFetcherComplete() {
public void onSuccess(FetchResult result) {
fetcher.shutdown();
if (isShutdown.get()) {
LOG.debug("{}: Already shutdown. Ignoring event from fetcher", srcNameTrimmed);
LOG.debug("{}: Already shutdown. Ignoring event from fetcher", sourceDestNameTrimmed);
} else {
Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
Expand All @@ -1200,9 +1201,9 @@ public void onFailure(Throwable t) {
// Unsuccessful - the fetcher may not have shutdown correctly. Try shutting it down.
fetcher.shutdown();
if (isShutdown.get()) {
LOG.debug("{}: Already shutdown. Ignoring error from fetcher.", srcNameTrimmed, t);
LOG.debug("{}: Already shutdown. Ignoring error from fetcher.", sourceDestNameTrimmed, t);
} else {
LOG.error(srcNameTrimmed + ": " + "Fetcher failed with error: ", t);
LOG.error(sourceDestNameTrimmed + ": " + "Fetcher failed with error: ", t);
shuffleError = t;
inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Fetch failed");
doBookKeepingForFetcherComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class Shuffle implements ExceptionReporter {
private volatile ListenableFuture<TezRawKeyValueIterator> runShuffleFuture;
private final ListeningExecutorService executor;

private final String srcNameTrimmed;
private final String sourceDestNameTrimmed;

private AtomicBoolean isShutDown = new AtomicBoolean(false);
private AtomicBoolean fetchersClosed = new AtomicBoolean(false);
Expand All @@ -109,7 +109,8 @@ public Shuffle(InputContext inputContext, Configuration conf, int numInputs,
this.inputContext = inputContext;
this.conf = conf;

this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + " -> "
+ TezUtilsInternal.cleanVertexName(inputContext.getTaskVertexName());

this.codec = CodecUtils.getCodec(conf);

Expand Down Expand Up @@ -138,7 +139,7 @@ public Shuffle(InputContext inputContext, Configuration conf, int numInputs,
TezCounter mergedMapOutputsCounter =
inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);

LOG.info(srcNameTrimmed + ": " + "Shuffle assigned with " + numInputs + " inputs" + ", codec: "
LOG.info(sourceDestNameTrimmed + ": " + "Shuffle assigned with " + numInputs + " inputs" + ", codec: "
+ (codec == null ? "None" : codec.getClass().getName())
+ ", ifileReadAhead: " + ifileReadAhead);

Expand Down Expand Up @@ -169,7 +170,7 @@ public Shuffle(InputContext inputContext, Configuration conf, int numInputs,
codec,
ifileReadAhead,
ifileReadAheadLength,
srcNameTrimmed);
sourceDestNameTrimmed);

this.mergePhaseTime = inputContext.getCounters().findCounter(TaskCounter.MERGE_PHASE_TIME);
this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
Expand All @@ -182,7 +183,7 @@ public Shuffle(InputContext inputContext, Configuration conf, int numInputs,
ShuffleUtils.isTezShuffleHandler(conf));

ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build());
.setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + sourceDestNameTrimmed + "}").build());


executor = MoreExecutors.listeningDecorator(rawExecutor);
Expand All @@ -193,7 +194,7 @@ public void handleEvents(List<Event> events) throws IOException {
if (!isShutDown.get()) {
eventHandler.handleEvents(events);
} else {
LOG.info(srcNameTrimmed + ": " + "Ignoring events since already shutdown. EventCount: " + events.size());
LOG.info(sourceDestNameTrimmed + ": " + "Ignoring events since already shutdown. EventCount: " + events.size());
}

}
Expand Down Expand Up @@ -267,7 +268,7 @@ public void run() throws IOException {
public void shutdown() {
if (!isShutDown.getAndSet(true)) {
// Interrupt so that the scheduler / merger sees this interrupt.
LOG.info("Shutting down Shuffle for source: " + srcNameTrimmed);
LOG.info("Shutting down Shuffle for source: " + sourceDestNameTrimmed);
runShuffleFuture.cancel(true);
cleanupIgnoreErrors();
}
Expand Down Expand Up @@ -323,7 +324,7 @@ protected TezRawKeyValueIterator callInternal() throws IOException, InterruptedE
}

inputContext.inputIsReady();
LOG.info("merge complete for input vertex : " + srcNameTrimmed);
LOG.info("merge complete for input vertex : " + sourceDestNameTrimmed);
return kvIter;
}
}
Expand All @@ -333,7 +334,7 @@ private void cleanupShuffleSchedulerIgnoreErrors() {
cleanupShuffleScheduler();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info(srcNameTrimmed + ": " + "Interrupted while attempting to close the scheduler during cleanup. Ignoring");
LOG.info(sourceDestNameTrimmed + ": " + "Interrupted while attempting to close the scheduler during cleanup. Ignoring");
}
}

Expand All @@ -351,13 +352,13 @@ private void cleanupMerger(boolean ignoreErrors) throws Throwable {
if (ignoreErrors) {
//Reset the status
Thread.currentThread().interrupt();
LOG.info(srcNameTrimmed + ": " + "Interrupted while attempting to close the merger during cleanup. Ignoring");
LOG.info(sourceDestNameTrimmed + ": " + "Interrupted while attempting to close the merger during cleanup. Ignoring");
} else {
throw e;
}
} catch (Throwable e) {
if (ignoreErrors) {
LOG.info(srcNameTrimmed + ": " + "Exception while trying to shutdown merger, Ignoring", e);
LOG.info(sourceDestNameTrimmed + ": " + "Exception while trying to shutdown merger, Ignoring", e);
} else {
throw e;
}
Expand All @@ -379,7 +380,7 @@ private void cleanupIgnoreErrors() {
}
cleanupMerger(true);
} catch (Throwable t) {
LOG.info(srcNameTrimmed + ": " + "Error in cleaning up.., ", t);
LOG.info(sourceDestNameTrimmed + ": " + "Error in cleaning up.., ", t);
}
}

Expand All @@ -388,7 +389,7 @@ private void cleanupIgnoreErrors() {
public synchronized void reportException(Throwable t) {
// RunShuffleCallable onFailure deals with ignoring errors on shutdown.
if (throwable.get() == null) {
LOG.info(srcNameTrimmed + ": " + "Setting throwable in reportException with message [" + t.getMessage() +
LOG.info(sourceDestNameTrimmed + ": " + "Setting throwable in reportException with message [" + t.getMessage() +
"] from thread [" + Thread.currentThread().getName());
throwable.set(t);
throwingThreadName = Thread.currentThread().getName();
Expand Down Expand Up @@ -423,15 +424,15 @@ public static long getInitialMemoryRequirement(Configuration conf, long maxAvail
private class ShuffleRunnerFutureCallback implements FutureCallback<TezRawKeyValueIterator> {
@Override
public void onSuccess(TezRawKeyValueIterator result) {
LOG.info(srcNameTrimmed + ": " + "Shuffle Runner thread complete");
LOG.info(sourceDestNameTrimmed + ": " + "Shuffle Runner thread complete");
}

@Override
public void onFailure(Throwable t) {
if (isShutDown.get()) {
LOG.info(srcNameTrimmed + ": " + "Already shutdown. Ignoring error");
LOG.info(sourceDestNameTrimmed + ": " + "Already shutdown. Ignoring error");
} else {
LOG.error(srcNameTrimmed + ": " + "ShuffleRunner failed with error", t);
LOG.error(sourceDestNameTrimmed + ": " + "ShuffleRunner failed with error", t);
// In case of an abort / Interrupt - the runtime makes sure that this is ignored.
inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Shuffle Runner Failed");
cleanupIgnoreErrors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ private InputContext createInputContext() throws IOException {
InputContext inputContext = mock(InputContext.class);
doReturn(new TezCounters()).when(inputContext).getCounters();
doReturn("sourceVertex").when(inputContext).getSourceVertexName();
doReturn("taskVertex").when(inputContext).getTaskVertexName();
doReturn(shuffleMetaData).when(inputContext)
.getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ private InputContext createInputContext() throws IOException {
InputContext inputContext = mock(InputContext.class);
doReturn(new TezCounters()).when(inputContext).getCounters();
doReturn("sourceVertex").when(inputContext).getSourceVertexName();
doReturn("taskVertex").when(inputContext).getTaskVertexName();
doReturn(shuffleMetaData).when(inputContext)
.getServiceProviderMetaData(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ private InputContext createTezInputContext() throws IOException {
InputContext inputContext = mock(InputContext.class);
doReturn(applicationId).when(inputContext).getApplicationId();
doReturn("sourceVertex").when(inputContext).getSourceVertexName();
doReturn("taskVertex").when(inputContext).getTaskVertexName();
when(inputContext.getCounters()).thenReturn(new TezCounters());
ExecutionContext executionContext = new ExecutionContextImpl("localhost");
doReturn(executionContext).when(inputContext).getExecutionContext();
Expand Down

0 comments on commit ba385cf

Please sign in to comment.