Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4340: Show convenient input -> output vertex names in input messages #170

Merged
merged 1 commit into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public interface InputContext extends TaskContext {
* @return Name of the Source Vertex
*/
public String getSourceVertexName();

/**
* Returns a convenient, human-readable string describing the input and output vertices.
* @return the convenient string
*/
String getInputOutputVertexNames();

/**
* Get the index of the input in the set of all inputs for the task. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ public List<Event> initialize() throws IOException {
getContext().inputIsReady();
this.splitInfoViaEvents = jobConf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT);
LOG.info(getContext().getSourceVertexName() + " using newmapreduce API=" + useNewApi +
LOG.info(getContext().getInputOutputVertexNames() + " using newmapreduce API=" + useNewApi +
", split via event=" + splitInfoViaEvents + ", numPhysicalInputs=" +
getNumPhysicalInputs());
initializeInternal();
Expand Down Expand Up @@ -526,7 +526,7 @@ inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(),
} finally {
rrLock.unlock();
}
LOG.info("Initialized MRInput: " + getContext().getSourceVertexName());
LOG.info("Initialized MRInput: " + getContext().getInputOutputVertexNames());
}

/**
Expand Down Expand Up @@ -634,7 +634,7 @@ void processSplitEvent(InputDataInformationEvent event)
try {
initFromEventInternal(event);
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " notifying on RecordReader initialized");
LOG.debug(getContext().getInputOutputVertexNames() + " notifying on RecordReader initialized");
}
rrInited.signal();
} finally {
Expand All @@ -647,7 +647,7 @@ void checkAndAwaitRecordReaderInitialization() throws IOException {
rrLock.lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " awaiting RecordReader initialization");
LOG.debug(getContext().getInputOutputVertexNames() + " awaiting RecordReader initialization");
}
rrInited.await();
} catch (Exception e) {
Expand All @@ -671,7 +671,7 @@ void initFromEvent(InputDataInformationEvent initEvent)

private void initFromEventInternal(InputDataInformationEvent initEvent) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " initializing RecordReader from event");
LOG.debug(getContext().getInputOutputVertexNames() + " initializing RecordReader from event");
}
Objects.requireNonNull(initEvent, "InitEvent must be specified");
MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload()));
Expand All @@ -686,7 +686,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I
LOG.warn("Thread interrupted while getting split length: ", e);
}
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
LOG.debug(getContext().getInputOutputVertexNames() + " split Details -> SplitClass: " +
split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength);
}

Expand All @@ -696,7 +696,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I
splitObj = split;
splitLength = split.getLength();
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
LOG.debug(getContext().getInputOutputVertexNames() + " split Details -> SplitClass: " +
split.getClass().getName() + ", OldSplit: " + split + ", length: " + splitLength);
}
}
Expand All @@ -705,7 +705,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I
.increment(splitLength);
}
mrReader.setSplit(splitObj);
LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event");
LOG.info(getContext().getInputOutputVertexNames() + " initialized RecordReader from event");
}

private static class MRInputHelpersInternal extends MRInputHelpers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public MRInputLegacy(InputContext inputContext, int numPhysicalInputs) {

@Private
protected void initializeInternal() throws IOException {
LOG.info(getContext().getSourceVertexName() + " MRInputLegacy deferring initialization");
LOG.info(getContext().getInputOutputVertexNames() + " MRInputLegacy deferring initialization");
}

@Private
Expand Down Expand Up @@ -136,7 +136,7 @@ void checkAndAwaitRecordReaderInitialization() throws IOException {
if (splitInfoViaEvents && !inited) {
if (initEvent == null) {
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() +
LOG.debug(getContext().getInputOutputVertexNames() +
" awaiting init event before initializing record reader");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private MultiMRInputConfigBuilder(Configuration conf, Class<?> inputFormat) {
@Override
public List<Event> initialize() throws IOException {
super.initialize();
LOG.info(getContext().getSourceVertexName() + " using newmapreduce API=" + useNewApi +
LOG.info(getContext().getInputOutputVertexNames() + " using newmapreduce API=" + useNewApi +
", numPhysicalInputs=" + getNumPhysicalInputs());
if (getNumPhysicalInputs() == 0) {
getContext().inputIsReady();
Expand Down Expand Up @@ -167,7 +167,7 @@ public void handleEvents(List<Event> inputEvents) throws Exception {
private MRReader initFromEvent(InputDataInformationEvent event) throws IOException {
Objects.requireNonNull(event, "Event must be specified");
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " initializing Reader: " + eventCount.get());
LOG.debug(getContext().getInputOutputVertexNames() + " initializing Reader: " + eventCount.get());
}
MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
MRReader reader = null;
Expand All @@ -186,7 +186,7 @@ private MRReader initFromEvent(InputDataInformationEvent event) throws IOExcepti
.getApplicationId().getId(), getContext().getTaskIndex(), getContext()
.getTaskAttemptNumber(), getContext());
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
LOG.debug(getContext().getInputOutputVertexNames() + " split Details -> SplitClass: " +
split.getClass().getName() + ", NewSplit: " + split + ", length: " + splitLength);
}
} else {
Expand All @@ -196,15 +196,15 @@ private MRReader initFromEvent(InputDataInformationEvent event) throws IOExcepti
reader = new MRReaderMapred(localJobConf, split,
getContext().getCounters(), inputRecordCounter, getContext());
if (LOG.isDebugEnabled()) {
LOG.debug(getContext().getSourceVertexName() + " split Details -> SplitClass: " +
LOG.debug(getContext().getInputOutputVertexNames() + " split Details -> SplitClass: " +
split.getClass().getName() + ", OldSplit: " + split + ", length: " + splitLength);
}
}
if (splitLength != -1) {
getContext().getCounters().findCounter(TaskCounter.INPUT_SPLIT_LENGTH_BYTES)
.increment(splitLength);
}
LOG.info(getContext().getSourceVertexName() + " initialized RecordReader from event");
LOG.info(getContext().getInputOutputVertexNames() + " initialized RecordReader from event");
return reader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ public String getSourceVertexName() {
return sourceVertexName;
}

@Override
public String getInputOutputVertexNames() {
return String.format("%s -> %s", getSourceVertexName(), getTaskVertexName());
}

@Override
public void fatalError(Throwable exception, String message) {
super.signalFatalError(exception, message, sourceInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
Expand All @@ -69,6 +69,7 @@
import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezUtilsInternal;

/**
* Responsible for fetching inputs served by the ShuffleHandler for a single
Expand Down Expand Up @@ -194,8 +195,8 @@ public String getHost() {
private final boolean isDebugEnabled = LOG.isDebugEnabled();

protected Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier,
JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf,
FetchedInputAllocator inputManager, InputContext inputContext,
JobTokenSecretManager jobTokenSecretManager, Configuration conf,
RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator,
Path lockPath,
Expand All @@ -208,8 +209,8 @@ protected Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.jobTokenSecretMgr = jobTokenSecretManager;
this.appId = appId;
this.dagIdentifier = dagIdentifier;
this.appId = inputContext.getApplicationId();
this.dagIdentifier = inputContext.getDagIdentifier();
this.pathToAttemptMap = new HashMap<PathPartition, InputAttemptIdentifier>();
this.httpConnectionParams = params;
this.conf = conf;
Expand All @@ -218,7 +219,10 @@ protected Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
this.sharedFetchEnabled = sharedFetchEnabled;

this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
this.logIdentifier = " fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;

String sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + " -> "
+ TezUtilsInternal.cleanVertexName(inputContext.getTaskVertexName());
this.logIdentifier = " fetcher [" + sourceDestNameTrimmed +"] " + fetcherIdentifier;

this.localFs = localFs;
this.localDirAllocator = localDirAllocator;
Expand Down Expand Up @@ -1133,31 +1137,29 @@ public static class FetcherBuilder {
private boolean workAssigned = false;

public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort,
boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
HttpConnectionParams params, FetchedInputAllocator inputManager, InputContext inputContext,
JobTokenSecretManager jobTokenSecretMgr, Configuration conf, boolean localDiskFetchEnabled,
String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, inputContext,
jobTokenSecretMgr, conf, null, null, null, localDiskFetchEnabled,
false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch);
}

public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, RawLocalFileSystem localFs,
HttpConnectionParams params, FetchedInputAllocator inputManager, InputContext inputContext,
JobTokenSecretManager jobTokenSecretMgr, Configuration conf, RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator, Path lockPath,
boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch,
boolean enableFetcherTestingErrors, ObjectRegistry objectRegistry) {
boolean enableFetcherTestingErrors) {
if (enableFetcherTestingErrors) {
this.fetcher = new FetcherWithInjectableErrors(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
this.fetcher = new FetcherWithInjectableErrors(fetcherCallback, params, inputManager, inputContext,
jobTokenSecretMgr, conf, localFs, localDirAllocator,
lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp,
verifyDiskChecksum, compositeFetch, objectRegistry);
verifyDiskChecksum, compositeFetch);
} else {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, inputContext,
jobTokenSecretMgr, conf, localFs, localDirAllocator,
lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp,
verifyDiskChecksum, compositeFetch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,16 +35,16 @@ public class FetcherWithInjectableErrors extends Fetcher {
private String srcNameTrimmed;

protected FetcherWithInjectableErrors(FetcherCallback fetcherCallback, HttpConnectionParams params,
FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier,
JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf,
FetchedInputAllocator inputManager, InputContext inputContext,
JobTokenSecretManager jobTokenSecretManager, Configuration conf,
RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled,
boolean sharedFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum,
boolean compositeFetch, ObjectRegistry objectRegistry) {
super(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretManager, srcNameTrimmed, conf,
boolean compositeFetch) {
super(fetcherCallback, params, inputManager, inputContext, jobTokenSecretManager, conf,
localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort,
asyncHttp, verifyDiskChecksum, compositeFetch);
this.fetcherErrorTestingConfig = new FetcherErrorTestingConfig(conf, objectRegistry);
this.srcNameTrimmed = srcNameTrimmed;
this.fetcherErrorTestingConfig = new FetcherErrorTestingConfig(conf, inputContext.getObjectRegistry());
this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
LOG.info("Initialized FetcherWithInjectableErrors with config: {}", fetcherErrorTestingConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private void handleEvent(Event event) throws IOException {

@Override
public void logProgress(boolean updateOnClose) {
LOG.info(inputContext.getSourceVertexName() + ": "
LOG.info(inputContext.getInputOutputVertexNames() + ": "
+ "numDmeEventsSeen=" + numDmeEvents.get()
+ ", numDmeEventsSeenWithNoData=" + numDmeEventsNoData.get()
+ ", numObsoletionEventsSeen=" + numObsoletionEvents.get()
Expand Down
Loading