Skip to content

Commit

Permalink
TEZ-4340: Show convenient input -> output vertex names in input messa…
Browse files Browse the repository at this point in the history
…ges (#170) (Csaba Juhasz reviewed by Laszlo Bodor)
  • Loading branch information
csjuhasz-c committed Jan 17, 2022
1 parent 0b3f3b6 commit cc8249e
Show file tree
Hide file tree
Showing 23 changed files with 209 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public interface InputContext extends TaskContext {
* @return Name of the Source Vertex
*/
public String getSourceVertexName();

/**
* Returns a convenient, human-readable string describing the input and output vertices.
* @return the convenient string
*/
String getInputOutputVertexNames();

/**
* Get the index of the input in the set of all inputs for the task. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ public List<Event> initialize() throws IOException {
getContext().inputIsReady();
this.splitInfoViaEvents = jobConf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT);
LOG.info(getContext().getSourceVertexName() + " using newmapreduce API=" + useNewApi +
LOG.info(getContext().getInputOutputVertexNames() + " using newmapreduce API=" + useNewApi +
", split via event=" + splitInfoViaEvents + ", numPhysicalInputs=" +
getNumPhysicalInputs());
initializeInternal();
Expand Down Expand Up @@ -526,7 +526,7 @@ inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(),
} finally {
rrLock.unlock();
}
LOG.info("Initialized MRInput: " + getContext().getSourceVertexName());
LOG.info("Initialized MRInput: " + getContext().getInputOutputVertexNames());
}

/**
Expand Down Expand Up @@ -634,7 +634,7 @@ void processSplitEvent(InputDataInformationEvent event)
try {
initFromEventInternal(event);
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " notifying on RecordReader initialized");
LOG.debug(getContext().getInputOutputVertexNames() + " notifying on RecordReader initialized");
}
rrInited.signal();
} finally {
Expand All @@ -647,7 +647,7 @@ void checkAndAwaitRecordReaderInitialization() throws IOException {
rrLock.lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " awaiting RecordReader initialization");
LOG.debug(getContext().getInputOutputVertexNames() + " awaiting RecordReader initialization");
}
rrInited.await();
} catch (Exception e) {
Expand All @@ -671,7 +671,7 @@ void initFromEvent(InputDataInformationEvent initEvent)

private void initFromEventInternal(InputDataInformationEvent initEvent) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " initializing RecordReader from event");
LOG.debug(getContext().getInputOutputVertexNames() + " initializing RecordReader from event");
}
Objects.requireNonNull(initEvent, "InitEvent must be specified");
MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload()));
Expand All @@ -686,7 +686,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I
LOG.warn("Thread interrupted while getting split length: ", e);
}
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
LOG.debug(getContext().getInputOutputVertexNames() + " split Details -> SplitClass: " +
split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength);
}

Expand All @@ -696,7 +696,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I
splitObj = split;
splitLength = split.getLength();
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
LOG.debug(getContext().getInputOutputVertexNames() + " split Details -> SplitClass: " +
split.getClass().getName() + ", OldSplit: " + split + ", length: " + splitLength);
}
}
Expand All @@ -705,7 +705,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I
.increment(splitLength);
}
mrReader.setSplit(splitObj);
LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event");
LOG.info(getContext().getInputOutputVertexNames() + " initialized RecordReader from event");
}

private static class MRInputHelpersInternal extends MRInputHelpers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public MRInputLegacy(InputContext inputContext, int numPhysicalInputs) {

@Private
protected void initializeInternal() throws IOException {
LOG.info(getContext().getSourceVertexName() + " MRInputLegacy deferring initialization");
LOG.info(getContext().getInputOutputVertexNames() + " MRInputLegacy deferring initialization");
}

@Private
Expand Down Expand Up @@ -136,7 +136,7 @@ void checkAndAwaitRecordReaderInitialization() throws IOException {
if (splitInfoViaEvents && !inited) {
if (initEvent == null) {
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() +
LOG.debug(getContext().getInputOutputVertexNames() +
" awaiting init event before initializing record reader");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private MultiMRInputConfigBuilder(Configuration conf, Class<?> inputFormat) {
@Override
public List<Event> initialize() throws IOException {
super.initialize();
LOG.info(getContext().getSourceVertexName() + " using newmapreduce API=" + useNewApi +
LOG.info(getContext().getInputOutputVertexNames() + " using newmapreduce API=" + useNewApi +
", numPhysicalInputs=" + getNumPhysicalInputs());
if (getNumPhysicalInputs() == 0) {
getContext().inputIsReady();
Expand Down Expand Up @@ -167,7 +167,7 @@ public void handleEvents(List<Event> inputEvents) throws Exception {
private MRReader initFromEvent(InputDataInformationEvent event) throws IOException {
Objects.requireNonNull(event, "Event must be specified");
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " initializing Reader: " + eventCount.get());
LOG.debug(getContext().getInputOutputVertexNames() + " initializing Reader: " + eventCount.get());
}
MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
MRReader reader = null;
Expand All @@ -186,7 +186,7 @@ private MRReader initFromEvent(InputDataInformationEvent event) throws IOExcepti
.getApplicationId().getId(), getContext().getTaskIndex(), getContext()
.getTaskAttemptNumber(), getContext());
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
LOG.debug(getContext().getInputOutputVertexNames() + " split Details -> SplitClass: " +
split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength);
}
} else {
Expand All @@ -196,15 +196,15 @@ private MRReader initFromEvent(InputDataInformationEvent event) throws IOExcepti
reader = new MRReaderMapred(localJobConf, split,
getContext().getCounters(), inputRecordCounter, getContext());
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
LOG.debug(getContext().getInputOutputVertexNames() + " split Details -> SplitClass: " +
split.getClass().getName() + ", OldSplit: " + split + ", length: " + splitLength);
}
}
if (splitLength != -1) {
getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES)
.increment(splitLength);
}
LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event");
LOG.info(getContext().getInputOutputVertexNames() + " initialized RecordReader from event");
return reader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ public String getSourceVertexName() {
return sourceVertexName;
}

@Override
public String getInputOutputVertexNames() {
return String.format("%s -> %s", getSourceVertexName(), getTaskVertexName());
}

@Override
public void fatalError(Throwable exception, String message) {
super.signalFatalError(exception, message, sourceInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
Expand All @@ -69,6 +69,7 @@
import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezUtilsInternal;

/**
* Responsible for fetching inputs served by the ShuffleHandler for a single
Expand Down Expand Up @@ -194,8 +195,8 @@ public String getHost() {
private final boolean isDebugEnabled = LOG.isDebugEnabled();

protected Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier,
JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf,
FetchedInputAllocator inputManager, InputContext inputContext,
JobTokenSecretManager jobTokenSecretManager, Configuration conf,
RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator,
Path lockPath,
Expand All @@ -208,8 +209,8 @@ protected Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.jobTokenSecretMgr = jobTokenSecretManager;
this.appId = appId;
this.dagIdentifier = dagIdentifier;
this.appId = inputContext.getApplicationId();
this.dagIdentifier = inputContext.getDagIdentifier();
this.pathToAttemptMap = new HashMap<PathPartition, InputAttemptIdentifier>();
this.httpConnectionParams = params;
this.conf = conf;
Expand All @@ -218,7 +219,10 @@ protected Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
this.sharedFetchEnabled = sharedFetchEnabled;

this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
this.logIdentifier = " fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;

String sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + " -> "
+ TezUtilsInternal.cleanVertexName(inputContext.getTaskVertexName());
this.logIdentifier = " fetcher [" + sourceDestNameTrimmed +"] " + fetcherIdentifier;

this.localFs = localFs;
this.localDirAllocator = localDirAllocator;
Expand Down Expand Up @@ -1133,31 +1137,29 @@ public static class FetcherBuilder {
private boolean workAssigned = false;

public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort,
boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
HttpConnectionParams params, FetchedInputAllocator inputManager, InputContext inputContext,
JobTokenSecretManager jobTokenSecretMgr, Configuration conf, boolean localDiskFetchEnabled,
String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, inputContext,
jobTokenSecretMgr, conf, null, null, null, localDiskFetchEnabled,
false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch);
}

public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, RawLocalFileSystem localFs,
HttpConnectionParams params, FetchedInputAllocator inputManager, InputContext inputContext,
JobTokenSecretManager jobTokenSecretMgr, Configuration conf, RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator, Path lockPath,
boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch,
boolean enableFetcherTestingErrors, ObjectRegistry objectRegistry) {
boolean enableFetcherTestingErrors) {
if (enableFetcherTestingErrors) {
this.fetcher = new FetcherWithInjectableErrors(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
this.fetcher = new FetcherWithInjectableErrors(fetcherCallback, params, inputManager, inputContext,
jobTokenSecretMgr, conf, localFs, localDirAllocator,
lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp,
verifyDiskChecksum, compositeFetch, objectRegistry);
verifyDiskChecksum, compositeFetch);
} else {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, inputContext,
jobTokenSecretMgr, conf, localFs, localDirAllocator,
lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp,
verifyDiskChecksum, compositeFetch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,16 +35,16 @@ public class FetcherWithInjectableErrors extends Fetcher {
private String srcNameTrimmed;

protected FetcherWithInjectableErrors(FetcherCallback fetcherCallback, HttpConnectionParams params,
FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier,
JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf,
FetchedInputAllocator inputManager, InputContext inputContext,
JobTokenSecretManager jobTokenSecretManager, Configuration conf,
RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled,
boolean sharedFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum,
boolean compositeFetch, ObjectRegistry objectRegistry) {
super(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretManager, srcNameTrimmed, conf,
boolean compositeFetch) {
super(fetcherCallback, params, inputManager, inputContext, jobTokenSecretManager, conf,
localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort,
asyncHttp, verifyDiskChecksum, compositeFetch);
this.fetcherErrorTestingConfig = new FetcherErrorTestingConfig(conf, objectRegistry);
this.srcNameTrimmed = srcNameTrimmed;
this.fetcherErrorTestingConfig = new FetcherErrorTestingConfig(conf, inputContext.getObjectRegistry());
this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
LOG.info("Initialized FetcherWithInjectableErrors with config: {}", fetcherErrorTestingConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private void handleEvent(Event event) throws IOException {

@Override
public void logProgress(boolean updateOnClose) {
LOG.info(inputContext.getSourceVertexName() + ": "
LOG.info(inputContext.getInputOutputVertexNames() + ": "
+ "numDmeEventsSeen=" + numDmeEvents.get()
+ ", numDmeEventsSeenWithNoData=" + numDmeEventsNoData.get()
+ ", numObsoletionEventsSeen=" + numObsoletionEvents.get()
Expand Down
Loading

0 comments on commit cc8249e

Please sign in to comment.