Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,37 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<executions>
<execution>
<id>src-compile-protoc</id>
<configuration>
<skip>false</skip>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.google.code.maven-replacer-plugin</groupId>
<artifactId>replacer</artifactId>
<executions>
<execution>
<id>replace-generated-sources</id>
<configuration>
<skip>false</skip>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<excludePackageNames>org.apache.hadoop.mapred.proto</excludePackageNames>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class LocalContainerLauncher extends AbstractService implements
private Thread eventHandler;
private byte[] encryptedSpillKey = new byte[] {0};
private BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>();
new LinkedBlockingQueue<>();

public LocalContainerLauncher(AppContext context,
TaskUmbilicalProtocol umbilical) {
Expand All @@ -115,8 +115,8 @@ public LocalContainerLauncher(AppContext context,
try {
curFC = FileContext.getFileContext(curDir.toURI());
} catch (UnsupportedFileSystemException ufse) {
LOG.error("Local filesystem " + curDir.toURI().toString()
+ " is unsupported?? (should never happen)");
LOG.error("Local filesystem {} is unsupported?? (should never happen)",
curDir.toURI());
}

// Save list of files/dirs that are supposed to be present so can delete
Expand All @@ -126,10 +126,8 @@ public LocalContainerLauncher(AppContext context,
// uberization in order to run correctly).
File[] curLocalFiles = curDir.listFiles();
if (curLocalFiles != null) {
HashSet<File> lf = new HashSet<File>(curLocalFiles.length);
for (int j = 0; j < curLocalFiles.length; ++j) {
lf.add(curLocalFiles[j]);
}
HashSet<File> lf = new HashSet<>(curLocalFiles.length);
Collections.addAll(lf, curLocalFiles);
localizedFiles = Collections.unmodifiableSet(lf);
}

Expand Down Expand Up @@ -157,26 +155,29 @@ public void serviceStart() throws Exception {
// thread context classloader so that it can be used by the event handler
// as well as the subtask runner threads
if (jobClassLoader != null) {
LOG.info("Setting " + jobClassLoader +
" as the context classloader of thread " + eventHandler.getName());
LOG.info("Setting {} as the context classloader of thread {}", jobClassLoader,
eventHandler.getName());
eventHandler.setContextClassLoader(jobClassLoader);
} else {
// note the current TCCL
LOG.info("Context classloader of thread " + eventHandler.getName() +
": " + eventHandler.getContextClassLoader());
LOG.info("Context classloader of thread {}: {}", eventHandler.getName(),
eventHandler.getContextClassLoader());
}
eventHandler.start();
super.serviceStart();
}

public void serviceStop() throws Exception {
if (eventHandler != null) {
eventHandler.interrupt();
}
if (taskRunner != null) {
taskRunner.shutdownNow();
try {
if (eventHandler != null) {
eventHandler.interrupt();
}
if (taskRunner != null) {
taskRunner.shutdownNow();
}
} finally {
super.serviceStop();
}
super.serviceStop();
}

@Override
Expand Down Expand Up @@ -223,7 +224,7 @@ private class EventHandler implements Runnable {
private int finishedSubMaps = 0;

private final Map<TaskAttemptId,Future<?>> futures =
new ConcurrentHashMap<TaskAttemptId,Future<?>>();
new ConcurrentHashMap<>();

EventHandler() {
}
Expand All @@ -235,7 +236,7 @@ public void run() {

// Collect locations of map outputs to give to reduces
final Map<TaskAttemptID, MapOutputFile> localMapFiles =
new HashMap<TaskAttemptID, MapOutputFile>();
new HashMap<>();

// _must_ either run subtasks sequentially or accept expense of new JVMs
// (i.e., fork()), else will get weird failures when maps try to create/
Expand All @@ -244,11 +245,11 @@ public void run() {
try {
event = eventQueue.take();
} catch (InterruptedException e) { // mostly via T_KILL? JOB_KILL?
LOG.warn("Returning, interrupted : " + e);
LOG.warn("Returning, interrupted : {}", String.valueOf(e));
break;
}

LOG.info("Processing the event " + event.toString());
LOG.info("Processing the event {}", event);

if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {

Expand Down Expand Up @@ -295,7 +296,7 @@ public void run() {
TaskAttemptId taId = event.getTaskAttemptID();
Future<?> future = futures.remove(taId);
if (future != null) {
LOG.info("canceling the task attempt " + taId);
LOG.info("canceling the task attempt {}", taId);
future.cancel(true);
}

Expand Down Expand Up @@ -378,14 +379,12 @@ private void runTask(ContainerRemoteLaunchEvent launchEv,
// if umbilical itself barfs (in error-handler of runSubMap()),
// we're pretty much hosed, so do what YarnChild main() does
// (i.e., exit clumsily--but can never happen, so no worries!)
LOG.error("oopsie... this can never happen: "
+ StringUtils.stringifyException(ioe));
LOG.error("oopsie... this can never happen: {}", StringUtils.stringifyException(ioe));
ExitUtil.terminate(-1);
} finally {
// remove my future
if (futures.remove(attemptID) != null) {
LOG.info("removed attempt " + attemptID +
" from the futures to keep track of");
LOG.info("removed attempt {} from the futures to keep track of", attemptID);
}
}
}
Expand Down Expand Up @@ -460,8 +459,8 @@ private void runSubtask(org.apache.hadoop.mapred.Task task,
// checking event queue is a tad wacky...but could enforce ordering
// (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?):
// doesn't send reduce event until maps all done]
LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ("
+ attemptID + "), but not yet finished with maps");
LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ({}),"
+ " but not yet finished with maps", attemptID);
throw new RuntimeException();
}

Expand All @@ -487,16 +486,15 @@ private void runSubtask(org.apache.hadoop.mapred.Task task,
throw new RuntimeException();

} catch (Exception exception) {
LOG.warn("Exception running local (uberized) 'child' : "
+ StringUtils.stringifyException(exception));
LOG.warn("Exception running local (uberized) 'child' : {}",
StringUtils.stringifyException(exception));
try {
if (task != null) {
// do cleanup for the task
task.taskCleanup(umbilical);
}
} catch (Exception e) {
LOG.info("Exception cleaning up: "
+ StringUtils.stringifyException(e));
LOG.info("Exception cleaning up: {}", StringUtils.stringifyException(e));
}
// Report back any failures, for diagnostic purposes
umbilical.reportDiagnosticInfo(classicAttemptID,
Expand Down Expand Up @@ -543,9 +541,8 @@ private void relocalize() {
deleted = false;
}
if (!deleted) {
LOG.warn("Unable to delete unexpected local file/dir "
+ curLocalFiles[j].getName()
+ ": insufficient permissions?");
LOG.warn("Unable to delete unexpected local file/dir {}: insufficient permissions?",
curLocalFiles[j].getName());
}
}
}
Expand Down Expand Up @@ -576,9 +573,8 @@ protected static MapOutputFile renameMapOutputForReduce(JobConf conf,
Path mapOutIndex = subMapOutputFile.getOutputIndexFile();
Path reduceInIndex = new Path(reduceIn.toString() + ".index");
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming map output file for task attempt "
+ mapId.toString() + " from original location " + mapOut.toString()
+ " to destination " + reduceIn.toString());
LOG.debug("Renaming map output file for task attempt {} from original location {}"
+ " to destination {}", mapId, mapOut, reduceIn);
}
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
Expand Down
Loading
Loading