Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<name>The Apache Software License,Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,8 @@ public final ProcessorContext getContext() {
public void abort() {
}

public boolean checkLimitReachedForTez(){
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
@Public
@Evolving
public class MultiMRInput extends MRInputBase {

private static final Logger LOG = LoggerFactory.getLogger(MultiMRInput.class);

public MultiMRInput(InputContext inputContext, int numPhysicalInputs) {
Expand All @@ -69,7 +68,7 @@ public MultiMRInput(InputContext inputContext, int numPhysicalInputs) {
private final AtomicInteger eventCount = new AtomicInteger(0);

private List<MRReader> readers = new LinkedList<MRReader>();

private static String TEZ_VERTEX_NAME = "tez.io.vertex.name";
/**
* Create an {@link MultiMRInputConfigBuilder} to configure a {@link MultiMRInput}</p>
* The preferred usage model is to provide all of the parameters, and use methods to configure
Expand Down Expand Up @@ -171,6 +170,7 @@ private MRReader initFromEvent(InputDataInformationEvent event) throws IOExcepti
MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
MRReader reader = null;
JobConf localJobConf = new JobConf(jobConf);
localJobConf.set(TEZ_VERTEX_NAME, getContext().getTaskVertexName());
long splitLength = -1;
if (useNewApi) {
InputSplit split = MRInputUtils.getNewSplitDetailsFromEvent(splitProto, localJobConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {

private static final Logger LOG = LoggerFactory
.getLogger(LogicalIOProcessorRuntimeTask.class);

@VisibleForTesting // All fields non private for testing.
private final String[] localDirs;
/** Responsible for maintaining order of Inputs */
Expand Down Expand Up @@ -405,7 +404,6 @@ public void close() throws Exception {
List<Event> closeOutputEvents = ((LogicalOutputFrameworkInterface)outputsMap.get(destVertexName)).close();
allCloseOutputEvents.add(closeOutputEvents);
}

// Close the Processor.
processorClosed = true;
processor.close();
Expand Down Expand Up @@ -750,16 +748,15 @@ private boolean handleEvent(TezEvent e) {
} catch (Throwable t) {
LOG.warn("Failed to handle event", t);
registerError();
EventMetaData sourceInfo = new EventMetaData(
e.getDestinationInfo().getEventGenerator(),
taskSpec.getVertexName(), e.getDestinationInfo().getEdgeVertexName(),
getTaskAttemptID());
EventMetaData sourceInfo =
new EventMetaData(e.getDestinationInfo().getEventGenerator(), taskSpec.getVertexName(),
e.getDestinationInfo().getEdgeVertexName(), getTaskAttemptID());
setFrameworkCounters();
// Signal such errors as RETRIABLE. The user code has an option to report this as something
// other than retriable before we get control back.
// TODO: Don't catch Throwables.
tezUmbilical.signalFailure(getTaskAttemptID(), TaskFailureType.NON_FATAL,
t, ExceptionUtils.getStackTrace(t), sourceInfo);
tezUmbilical.signalFailure(getTaskAttemptID(), TaskFailureType.NON_FATAL, t, ExceptionUtils.getStackTrace(t),
sourceInfo);
return false;
}
return true;
Expand Down