Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Deleting stale files

  • Loading branch information...
commit f7e28126b4ddc796f3d81c5e0502f4c7aa0d6086 1 parent fcc1cd0
Ramkumar Vadali authored
View
73 src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestMemBasedLoadManager.java
@@ -1,73 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-
-package org.apache.hadoop.mapred;
-
-
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-/**
- *
- */
-public class TestMemBasedLoadManager extends TestCase {
-
- public void testCanLaunchTask() {
- // Prepare MemBasedLoadManager to test
- MemBasedLoadManager memLoadMgr = new MemBasedLoadManager();
- memLoadMgr.start();
- Configuration conf = new Configuration();
- String[] affectedUsers = {"user1", "user2", "user3"};
- conf.setStrings(MemBasedLoadManager.AFFECTED_USERS_STRING, affectedUsers);
- conf.setLong(MemBasedLoadManager.RESERVED_PHYSICAL_MEMORY_ON_TT_STRING,
- 4 * 1024L);
- memLoadMgr.setConf(conf);
- assertFalse("MemBasedLoadManager should not affect all user if the list of "
- + "affected users is given", memLoadMgr.isAffectAllUsers());
-
- // Prepare job to test
- JobConf jobConf = new JobConf();
- JobInProgress job = new JobInProgress(
- JobID.forName("job_200909090000_0001"), jobConf, null);
- job.profile = new JobProfile();
- job.profile.user = "user1";
-
- // Prepare TaskTracker to test
- String launchMsg = "Memory is under limit. Task should be able to launch.";
- String failedMsg = "Memory exceeds limit. Task should not launch.";
-
- // Not enough free memory on the TaskTracker
- TaskTrackerStatus tracker = new TaskTrackerStatus();
- tracker.getResourceStatus().
- setAvailablePhysicalMemory(3 * 1024 * 1024 * 1024L);
- assertFalse(failedMsg, memLoadMgr.canLaunchTask(tracker, job,
- TaskType.MAP));
- assertFalse(failedMsg, memLoadMgr.canLaunchTask(tracker, job,
- TaskType.REDUCE));
-
- // Enough memory on the TaskTracker
- tracker.getResourceStatus().
- setAvailablePhysicalMemory(5 * 1024 * 1024 * 1024L);
- assertTrue(launchMsg, memLoadMgr.canLaunchTask(tracker, job, TaskType.MAP));
- assertTrue(failedMsg, memLoadMgr.canLaunchTask(tracker, job,
- TaskType.REDUCE));
-
- // Switch to a user that is not affected
- job.profile.user = "user6";
- tracker.getResourceStatus().
- setAvailablePhysicalMemory(1 * 1024 * 1024 * 1024L);
- assertTrue(launchMsg, memLoadMgr.canLaunchTask(tracker, job, TaskType.MAP));
- assertTrue(launchMsg, memLoadMgr.canLaunchTask(tracker, job,
- TaskType.REDUCE));
-
- // Set the affect everyone property
- memLoadMgr.setAffectAllUsers(true);
- tracker.getResourceStatus().
- setAvailablePhysicalMemory(1 * 1024 * 1024 * 1024L);
- assertFalse(failedMsg, memLoadMgr.canLaunchTask(tracker, job,
- TaskType.MAP));
- assertFalse(failedMsg, memLoadMgr.canLaunchTask(tracker, job,
- TaskType.REDUCE));
- }
-}
View
214 src/test/org/apache/hadoop/ipc/TestRPCCompatibility.java
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ipc;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC.VersionIncompatible;
-import org.apache.hadoop.net.NetUtils;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/** Unit test for supporting across-version RPCs. */
-public class TestRPCCompatibility {
- private static final String ADDRESS = "0.0.0.0";
- private static InetSocketAddress addr;
- private static Server server;
- private VersionedProtocol proxy;
-
- public static final Log LOG =
- LogFactory.getLog(TestRPCCompatibility.class);
-
- private static Configuration conf = new Configuration();
-
- public interface TestProtocol0 extends VersionedProtocol {
- public static final long versionID = 0L;
- void ping() throws IOException;
- }
-
- public interface TestProtocol1 extends TestProtocol0 {
- public static final long versionID = 1L;
-
- String echo(String value) throws IOException;
- }
-
- public interface TestProtocol2 extends TestProtocol1 {
- public static final long versionID = 2L;
- int add(int v1, int v2);
- }
-
- public interface TestProtocol3 extends TestProtocol2 {
- public static final long versionID = 3L;
- int echo(int value) throws IOException;
- }
-
- public static class TestImpl implements TestProtocol2 {
- int fastPingCounter = 0;
-
- @Override
- public long getProtocolVersion(String protocol, long clientVersion)
- throws RPC.VersionIncompatible, IOException {
- // Although version 0 is compatible but it is too old
- // so disallow this version of client
- if (clientVersion == TestProtocol0.versionID ) {
- throw new RPC.VersionIncompatible(
- this.getClass().getName(), clientVersion, versionID);
- }
- return TestProtocol2.versionID;
- }
-
- @Override
- public String echo(String value) { return value; }
-
- @Override
- public int add(int v1, int v2) { return v1 + v2; }
-
- @Override
- public void ping() { return; }
- }
-
- @BeforeClass
- public static void setup() throws IOException {
- // create a server with two handlers
- server = RPC.getServer(new TestImpl(), ADDRESS, 0, 2, false, conf);
- server.start();
- addr = NetUtils.getConnectAddress(server);
- }
-
- @AfterClass
- public static void tearDown() throws IOException {
- if (server != null) {
- server.stop();
- }
- }
-
- @After
- public void shutdownProxy() {
- if (proxy != null) {
- RPC.stopProxy(proxy);
- }
- }
-
- @Test
- public void testIncompatibleOldClient() throws Exception {
- try {
- proxy = RPC.getProxy(
- TestProtocol1.class, TestProtocol0.versionID, addr, conf);
- fail("Should not be able to connect to the server");
- } catch (RemoteException re) {
- assertEquals(RPC.VersionIncompatible.class.getName(), re.getClassName());
- }
- }
-
- @Test
- public void testCompatibleOldClient() throws Exception {
- try {
- proxy = RPC.getProxy(
- TestProtocol1.class, TestProtocol1.versionID, addr, conf);
- fail("Expect to get a version mismatch exception");
- } catch(RPC.VersionMismatch e) {
- assertEquals(TestProtocol2.versionID, e.getServerVersion());
- proxy = e.getProxy();
- }
-
- TestProtocol1 proxy1 = (TestProtocol1)proxy;
- assertEquals("hello", proxy1.echo("hello")); // test equal
- }
-
- @Test
- public void testEqualVersionClient() throws Exception {
- proxy = RPC.getProxy(
- TestProtocol2.class, TestProtocol2.versionID, addr, conf);
-
- TestProtocol2 proxy2 = (TestProtocol2)proxy;
- assertEquals(3, proxy2.add(1, 2));
- assertEquals("hello", proxy2.echo("hello"));
- proxy2.ping();
- }
-
- private class Version3Client implements TestProtocol3 {
-
- private TestProtocol3 proxy3;
- private long serverVersion = versionID;
-
- private Version3Client() throws IOException {
- try {
- proxy = RPC.getProxy(
- TestProtocol3.class, TestProtocol3.versionID, addr, conf);
- } catch (RPC.VersionMismatch e) {
- serverVersion = e.getServerVersion();
- if (serverVersion != TestProtocol2.versionID) {
- throw new RPC.VersionIncompatible(TestProtocol3.class.getName(),
- versionID, serverVersion);
- }
- proxy = e.getProxy();
- }
- proxy3 = (TestProtocol3) proxy;
- }
-
- @Override
- public int echo(int value) throws IOException, NumberFormatException {
- if (serverVersion == versionID) { // same version
- return proxy3.echo(value); // use version 3 echo int
- } else { // server is version 2
- return Integer.parseInt(proxy3.echo(String.valueOf(value)));
- }
- }
-
- @Override
- public int add(int v1, int v2) {
- // TODO Auto-generated method stub
- return proxy3.add(v1, v2);
- }
-
- @Override
- public String echo(String value) throws IOException {
- return proxy3.echo(value);
- }
-
- @Override
- public void ping() throws IOException {
- proxy3.ping();
- }
-
- @Override
- public long getProtocolVersion(String protocol, long clientVersion)
- throws VersionIncompatible, IOException {
- return versionID;
- }
- }
-
- @Test
- public void testCompatibleNewClient() throws Exception {
- Version3Client client = new Version3Client();
- assertEquals(3, client.add(1, 2));
- assertEquals("hello", client.echo("hello"));
- assertEquals(3, client.echo(3));
- client.ping();
- }
-}
View
571 src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
@@ -1,571 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.UtilsForTests;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import junit.framework.TestCase;
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * TestJobTrackerRestart checks if the jobtracker can restart. JobTracker
- * should be able to continue running the previously running jobs and also
- * recover previosuly submitted jobs.
- */
-public class TestJobTrackerRestart extends TestCase {
- static final Path testDir =
- new Path(System.getProperty("test.build.data","/tmp"),
- "jt-restart-testing");
- final Path inDir = new Path(testDir, "input");
- static final Path shareDir = new Path(testDir, "share");
- final Path outputDir = new Path(testDir, "output");
- private static int numJobsSubmitted = 0;
-
- /**
- * Return the job conf configured with the priorities and mappers as passed.
- * @param conf The default conf
- * @param priorities priorities for the jobs
- * @param numMaps number of maps for the jobs
- * @param numReds number of reducers for the jobs
- * @param outputDir output dir
- * @param inDir input dir
- * @param mapSignalFile filename thats acts as a signal for maps
- * @param reduceSignalFile filename thats acts as a signal for reducers
- * @return a array of jobconfs configured as needed
- * @throws IOException
- */
- private static JobConf[] getJobs(JobConf conf, JobPriority[] priorities,
- int[] numMaps, int[] numReds,
- Path outputDir, Path inDir,
- String mapSignalFile, String reduceSignalFile)
- throws IOException {
- JobConf[] jobs = new JobConf[priorities.length];
- for (int i = 0; i < jobs.length; ++i) {
- jobs[i] = new JobConf(conf);
- Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++));
- UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir,
- numMaps[i], numReds[i], "jt restart test job", mapSignalFile,
- reduceSignalFile);
- jobs[i].setJobPriority(priorities[i]);
- }
- return jobs;
- }
-
- /**
- * Clean up the signals.
- */
- private static void cleanUp(FileSystem fileSys, Path dir) throws IOException {
- // Delete the map signal file
- fileSys.delete(new Path(getMapSignalFile(dir)), false);
- // Delete the reduce signal file
- fileSys.delete(new Path(getReduceSignalFile(dir)), false);
- }
-
- /**
- * Tests the jobtracker with restart-recovery turned off.
- * Submit a job with normal priority, maps = 2, reducers = 0}
- *
- * Wait for the job to complete 50%
- *
- * Restart the jobtracker with recovery turned off
- *
- * Check if the job is missing
- */
- public void testRestartWithoutRecovery(MiniDFSCluster dfs,
- MiniMRCluster mr)
- throws IOException {
- // III. Test a job with waiting mapper and recovery turned off
-
- FileSystem fileSys = dfs.getFileSystem();
-
- cleanUp(fileSys, shareDir);
-
- JobConf newConf = getJobs(mr.createJobConf(),
- new JobPriority[] {JobPriority.NORMAL},
- new int[] {2}, new int[] {0},
- outputDir, inDir,
- getMapSignalFile(shareDir),
- getReduceSignalFile(shareDir))[0];
-
- JobClient jobClient = new JobClient(newConf);
- RunningJob job = jobClient.submitJob(newConf);
- JobID id = job.getID();
-
- // make sure that the job is 50% completed
- while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
- UtilsForTests.waitFor(100);
- }
-
- mr.stopJobTracker();
-
- // Turn off the recovery
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
- false);
-
- // Wait for a minute before submitting a job
- UtilsForTests.waitFor(60 * 1000);
-
- mr.startJobTracker();
-
- // Signal the tasks
- UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
- getReduceSignalFile(shareDir));
-
- // Wait for the JT to be ready
- UtilsForTests.waitForJobTracker(jobClient);
-
- UtilsForTests.waitTillDone(jobClient);
-
- // The submitted job should not exist
- assertTrue("Submitted job was detected with recovery disabled",
- UtilsForTests.getJobStatus(jobClient, id) == null);
- }
-
- /** Tests a job on jobtracker with restart-recovery turned on.
- * Preparation :
- * - Configure a job with
- * - num-maps : 50
- * - num-reducers : 1
- * - Configure the cluster to run 1 reducer
- * - Lower the history file block size and buffer
- *
- * Wait for the job to complete 50%. Note that all the job is configured to
- * use {@link HalfWaitingMapper} and {@link WaitingReducer}. So job will
- * eventually wait on 50%
- *
- * Make a note of the following things
- * - Task completion events
- * - Cluster status
- * - Task Reports
- * - Job start time
- *
- * Restart the jobtracker
- *
- * Wait for job to finish all the maps and note the TaskCompletion events at
- * the tracker.
- *
- * Wait for all the jobs to finish and note the following
- * - New task completion events at the jobtracker
- * - Task reports
- * - Cluster status
- *
- * Check for the following
- * - Task completion events for recovered tasks should match
- * - Task completion events at the tasktracker and the restarted
- * jobtracker should be same
- * - Cluster status should be fine.
- * - Task Reports for recovered tasks should match
- * Checks
- * - start time
- * - finish time
- * - counters
- * - http-location
- * - task-id
- * - Job start time should match
- * - Check if the counters can be accessed
- * - Check if the history files are (re)named properly
- */
- public void testTaskEventsAndReportsWithRecovery(MiniDFSCluster dfs,
- MiniMRCluster mr)
- throws IOException {
- // II. Test a tasktracker with waiting mapper and recovery turned on.
- // Ideally the tracker should SYNC with the new/restarted jobtracker
-
- FileSystem fileSys = dfs.getFileSystem();
- final int numMaps = 50;
- final int numReducers = 1;
-
-
- cleanUp(fileSys, shareDir);
-
- JobConf newConf = getJobs(mr.createJobConf(),
- new JobPriority[] {JobPriority.NORMAL},
- new int[] {numMaps}, new int[] {numReducers},
- outputDir, inDir,
- getMapSignalFile(shareDir),
- getReduceSignalFile(shareDir))[0];
-
- JobClient jobClient = new JobClient(newConf);
- RunningJob job = jobClient.submitJob(newConf);
- JobID id = job.getID();
-
- // change the job priority
- mr.setJobPriority(id, JobPriority.HIGH);
-
- mr.initializeJob(id);
-
- // make sure that atleast on reducer is spawned
- while (jobClient.getClusterStatus().getReduceTasks() == 0) {
- UtilsForTests.waitFor(100);
- }
-
- while(true) {
- // Since we are using a half waiting mapper, maps should be stuck at 50%
- TaskCompletionEvent[] trackerEvents =
- mr.getMapTaskCompletionEventsUpdates(0, id, numMaps)
- .getMapTaskCompletionEvents();
- if (trackerEvents.length < numMaps / 2) {
- UtilsForTests.waitFor(1000);
- } else {
- break;
- }
- }
-
- TaskCompletionEvent[] prevEvents =
- mr.getTaskCompletionEvents(id, 0, numMaps);
- TaskReport[] prevSetupReports = jobClient.getSetupTaskReports(id);
- TaskReport[] prevMapReports = jobClient.getMapTaskReports(id);
- ClusterStatus prevStatus = jobClient.getClusterStatus();
-
- mr.stopJobTracker();
-
- // Turn off the recovery
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
- true);
-
- // Wait for a minute before submitting a job
- UtilsForTests.waitFor(60 * 1000);
-
- mr.startJobTracker();
-
- // Signal the map tasks
- UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
- getReduceSignalFile(shareDir));
-
- // Wait for the JT to be ready
- UtilsForTests.waitForJobTracker(jobClient);
-
- int numToMatch = mr.getNumEventsRecovered() / 2;
-
- // make sure that the maps are completed
- while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 1.0f) {
- UtilsForTests.waitFor(100);
- }
-
- // Get the new jobtrackers events
- TaskCompletionEvent[] jtEvents =
- mr.getTaskCompletionEvents(id, 0, 2 * numMaps);
-
- // Test if all the events that were recovered match exactly
- testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
-
- // Check the task reports
- // The reports should match exactly if the attempts are same
- TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
- TaskReport[] afterSetupReports = jobClient.getSetupTaskReports(id);
- testTaskReports(prevMapReports, afterMapReports, numToMatch - 1);
- testTaskReports(prevSetupReports, afterSetupReports, 1);
-
- // check the job priority
- assertEquals("Job priority change is not reflected",
- JobPriority.HIGH, mr.getJobPriority(id));
-
- List<TaskCompletionEvent> jtMapEvents =
- new ArrayList<TaskCompletionEvent>();
- for (TaskCompletionEvent tce : jtEvents) {
- if (tce.isMapTask()) {
- jtMapEvents.add(tce);
- }
- }
-
- TaskCompletionEvent[] trackerEvents;
- while(true) {
- // Wait for the tracker to pull all the map events
- trackerEvents =
- mr.getMapTaskCompletionEventsUpdates(0, id, jtMapEvents.size())
- .getMapTaskCompletionEvents();
- if (trackerEvents.length < jtMapEvents.size()) {
- UtilsForTests.waitFor(1000);
- } else {
- break;
- }
- }
-
- // Signal the reduce tasks
- UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
- getReduceSignalFile(shareDir));
-
- UtilsForTests.waitTillDone(jobClient);
-
- testTaskCompletionEvents(jtMapEvents.toArray(new TaskCompletionEvent[0]),
- trackerEvents, true, -1);
-
- // validate the history file
- TestJobHistory.validateJobHistoryFileFormat(id, newConf, "SUCCESS", true);
- TestJobHistory.validateJobHistoryFileContent(mr, job, newConf);
-
- // check if the cluster status is insane
- ClusterStatus status = jobClient.getClusterStatus();
- assertTrue("Cluster status is insane",
- checkClusterStatusOnCompletion(status, prevStatus));
- }
-
- /**
- * Matches specified number of task reports.
- * @param source the reports to be matched
- * @param target reports to match with
- * @param numToMatch num reports to match
- * @param mismatchSet reports that should not match
- */
- private void testTaskReports(TaskReport[] source, TaskReport[] target,
- int numToMatch) {
- for (int i = 0; i < numToMatch; ++i) {
- // Check if the task reports was recovered correctly
- assertTrue("Task reports for same attempt has changed",
- source[i].equals(target[i]));
- }
- }
-
- /**
- * Matches the task completion events.
- * @param source the events to be matched
- * @param target events to match with
- * @param fullMatch whether to match the events completely or partially
- * @param numToMatch number of events to match in case full match is not
- * desired
- * @param ignoreSet a set of taskids to ignore
- */
- private void testTaskCompletionEvents(TaskCompletionEvent[] source,
- TaskCompletionEvent[] target,
- boolean fullMatch,
- int numToMatch) {
- // Check if the event list size matches
- // The lengths should match only incase of full match
- if (fullMatch) {
- assertEquals("Map task completion events mismatch",
- source.length, target.length);
- numToMatch = source.length;
- }
- // Check if the events match
- for (int i = 0; i < numToMatch; ++i) {
- if (source[i].getTaskAttemptId().equals(target[i].getTaskAttemptId())){
- assertTrue("Map task completion events ordering mismatch",
- source[i].equals(target[i]));
- }
- }
- }
-
- private boolean checkClusterStatusOnCompletion(ClusterStatus status,
- ClusterStatus prevStatus) {
- return status.getJobTrackerState() == prevStatus.getJobTrackerState()
- && status.getMapTasks() == 0
- && status.getReduceTasks() == 0;
- }
-
- /** Committer with setup waiting
- */
- static class CommitterWithDelaySetup extends FileOutputCommitter {
- @Override
- public void setupJob(JobContext context) throws IOException {
- FileSystem fs = FileSystem.get(context.getConfiguration());
- while (true) {
- if (fs.exists(shareDir)) {
- break;
- }
- UtilsForTests.waitFor(100);
- }
- super.cleanupJob(context);
- }
- }
-
- /** Tests a job on jobtracker with restart-recovery turned on and empty
- * jobhistory file.
- * Preparation :
- * - Configure a job with
- * - num-maps : 0 (long waiting setup)
- * - num-reducers : 0
- *
- * Check if the job succeedes after restart.
- *
- * Assumption that map slots are given first for setup.
- */
- public void testJobRecoveryWithEmptyHistory(MiniDFSCluster dfs,
- MiniMRCluster mr)
- throws IOException {
- mr.startTaskTracker(null, null, 1, 1);
- FileSystem fileSys = dfs.getFileSystem();
-
- cleanUp(fileSys, shareDir);
- cleanUp(fileSys, inDir);
- cleanUp(fileSys, outputDir);
-
- JobConf conf = mr.createJobConf();
- conf.setNumReduceTasks(0);
- conf.setOutputCommitter(TestEmptyJob.CommitterWithDelayCleanup.class);
- fileSys.delete(outputDir, false);
- RunningJob job1 =
- UtilsForTests.runJob(conf, inDir, outputDir, 30, 0);
-
- conf.setNumReduceTasks(0);
- conf.setOutputCommitter(CommitterWithDelaySetup.class);
- Path inDir2 = new Path(testDir, "input2");
- fileSys.mkdirs(inDir2);
- Path outDir2 = new Path(testDir, "output2");
- fileSys.delete(outDir2, false);
- JobConf newConf = getJobs(mr.createJobConf(),
- new JobPriority[] {JobPriority.NORMAL},
- new int[] {10}, new int[] {0},
- outDir2, inDir2,
- getMapSignalFile(shareDir),
- getReduceSignalFile(shareDir))[0];
-
- JobClient jobClient = new JobClient(newConf);
- RunningJob job2 = jobClient.submitJob(newConf);
- JobID id = job2.getID();
-
- /*RunningJob job2 =
- UtilsForTests.runJob(mr.createJobConf(), inDir2, outDir2, 0);
-
- JobID id = job2.getID();*/
- JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
-
- mr.getJobTrackerRunner().getJobTracker().initJob(jip);
-
- // find out the history filename
- String history =
- JobHistory.JobInfo.getJobHistoryFileName(jip.getJobConf(), id);
- Path historyPath = JobHistory.JobInfo.getJobHistoryLogLocation(history);
- // get the conf file name
- String parts[] = history.split("_");
- // jobtracker-hostname_jobtracker-identifier_conf.xml
- String jobUniqueString = parts[0] + "_" + parts[1] + "_" + id;
- Path confPath = new Path(historyPath.getParent(), jobUniqueString + "_conf.xml");
-
- // make sure that setup is launched
- while (jip.runningMaps() == 0) {
- UtilsForTests.waitFor(100);
- }
-
- id = job1.getID();
- jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
-
- mr.getJobTrackerRunner().getJobTracker().initJob(jip);
-
- // make sure that cleanup is launched and is waiting
- while (!jip.isCleanupLaunched()) {
- UtilsForTests.waitFor(100);
- }
-
- mr.stopJobTracker();
-
- // delete the history file .. just to be safe.
- FileSystem historyFS = historyPath.getFileSystem(conf);
- historyFS.delete(historyPath, false);
- historyFS.create(historyPath).close(); // create an empty file
-
-
- UtilsForTests.signalTasks(dfs, fileSys, getMapSignalFile(shareDir), getReduceSignalFile(shareDir), (short)1);
-
- // Turn on the recovery
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
- true);
-
- mr.startJobTracker();
-
- job1.waitForCompletion();
- job2.waitForCompletion();
-
- // check if the old files are deleted
- assertFalse("Old jobhistory file is not deleted", historyFS.exists(historyPath));
- assertFalse("Old jobconf file is not deleted", historyFS.exists(confPath));
- }
-
- public void testJobTrackerRestart() throws IOException {
- String namenode = null;
- MiniDFSCluster dfs = null;
- MiniMRCluster mr = null;
- FileSystem fileSys = null;
-
- try {
- Configuration conf = new Configuration();
- conf.setBoolean("dfs.replication.considerLoad", false);
- dfs = new MiniDFSCluster(conf, 1, true, null, null);
- dfs.waitActive();
- fileSys = dfs.getFileSystem();
-
- // clean up
- fileSys.delete(testDir, true);
-
- if (!fileSys.mkdirs(inDir)) {
- throw new IOException("Mkdirs failed to create " + inDir.toString());
- }
-
- // Write the input file
- UtilsForTests.writeFile(dfs.getNameNode(), conf,
- new Path(inDir + "/file"), (short)1);
-
- dfs.startDataNodes(conf, 1, true, null, null, null, null);
- dfs.waitActive();
-
- namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
- + (dfs.getFileSystem()).getUri().getPort();
-
- // Make sure that jobhistory leads to a proper job restart
- // So keep the blocksize and the buffer size small
- JobConf jtConf = new JobConf();
- jtConf.set("mapred.jobtracker.job.history.block.size", "1024");
- jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
- jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
- jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000);
- jtConf.setBoolean("mapred.acls.enabled", true);
- // get the user group info
- UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
- jtConf.set("mapred.queue.default.acl-submit-job", ugi.getUserName());
-
- mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
-
- // Test the tasktracker SYNC
- testTaskEventsAndReportsWithRecovery(dfs, mr);
-
- // Test jobtracker with restart-recovery turned off
- testRestartWithoutRecovery(dfs, mr);
-
- // test recovery with empty file
- testJobRecoveryWithEmptyHistory(dfs, mr);
- } finally {
- if (mr != null) {
- try {
- mr.shutdown();
- } catch (Exception e) {}
- }
- if (dfs != null) {
- try {
- dfs.shutdown();
- } catch (Exception e) {}
- }
- }
- }
-
- private static String getMapSignalFile(Path dir) {
- return (new Path(dir, "jt-restart-map-signal")).toString();
- }
-
- private static String getReduceSignalFile(Path dir) {
- return (new Path(dir, "jt-restart-reduce-signal")).toString();
- }
-
- public static void main(String[] args) throws IOException {
- new TestJobTrackerRestart().testJobTrackerRestart();
- }
-}
View
177 src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.TestJobTrackerRestart;
-
-import junit.framework.TestCase;
-import java.io.*;
-
-/**
- * This test checks if the jobtracker can detect and recover a tracker that was
- * lost while the jobtracker was down.
- */
-public class TestJobTrackerRestartWithLostTracker extends TestCase {
- final Path testDir = new Path("/jt-restart-lost-tt-testing");
- final Path inDir = new Path(testDir, "input");
- final Path shareDir = new Path(testDir, "share");
- final Path outputDir = new Path(testDir, "output");
-
- private JobConf configureJob(JobConf conf, int maps, int reduces,
- String mapSignal, String redSignal)
- throws IOException {
- UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir,
- maps, reduces, "test-jobtracker-restart-with-lost-tt",
- mapSignal, redSignal);
- return conf;
- }
-
- public void testRecoveryWithLostTracker(MiniDFSCluster dfs,
- MiniMRCluster mr)
- throws IOException {
- FileSystem fileSys = dfs.getFileSystem();
- JobConf jobConf = mr.createJobConf();
- int numMaps = 50;
- int numReds = 1;
- String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
- String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
-
- // Configure the jobs
- JobConf job = configureJob(jobConf, numMaps, numReds,
- mapSignalFile, redSignalFile);
-
- fileSys.delete(shareDir, true);
-
- // Submit a master job
- JobClient jobClient = new JobClient(job);
- RunningJob rJob = jobClient.submitJob(job);
- JobID id = rJob.getID();
-
- // wait for the job to be inited
- mr.initializeJob(id);
-
- // Make sure that the master job is 50% completed
- while (UtilsForTests.getJobStatus(jobClient, id).mapProgress()
- < 0.5f) {
- UtilsForTests.waitFor(100);
- }
-
- // Kill the jobtracker
- mr.stopJobTracker();
-
- // Signal the maps to complete
- UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
-
- // Enable recovery on restart
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
- true);
-
- // Kill the 2nd tasktracker
- mr.stopTaskTracker(1);
-
- // Wait for a minute before submitting a job
- UtilsForTests.waitFor(60 * 1000);
-
- // Restart the jobtracker
- mr.startJobTracker();
-
- // Check if the jobs are still running
-
- // Wait for the JT to be ready
- UtilsForTests.waitForJobTracker(jobClient);
-
- // Signal the reducers to complete
- UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile,
- redSignalFile);
-
- UtilsForTests.waitTillDone(jobClient);
-
- // Check if the tasks on the lost tracker got re-executed
- assertEquals("Tracker killed while the jobtracker was down did not get lost "
- + "upon restart",
- jobClient.getClusterStatus().getTaskTrackers(), 1);
-
- // validate the history file
- TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);
- TestJobHistory.validateJobHistoryFileContent(mr, rJob, job);
- }
-
- public void testRestartWithLostTracker() throws IOException {
- String namenode = null;
- MiniDFSCluster dfs = null;
- MiniMRCluster mr = null;
- FileSystem fileSys = null;
-
- try {
- Configuration conf = new Configuration();
- conf.setBoolean("dfs.replication.considerLoad", false);
- dfs = new MiniDFSCluster(conf, 1, true, null, null);
- dfs.waitActive();
- fileSys = dfs.getFileSystem();
-
- // clean up
- fileSys.delete(testDir, true);
-
- if (!fileSys.mkdirs(inDir)) {
- throw new IOException("Mkdirs failed to create " + inDir.toString());
- }
-
- // Write the input file
- UtilsForTests.writeFile(dfs.getNameNode(), conf,
- new Path(inDir + "/file"), (short)1);
-
- dfs.startDataNodes(conf, 1, true, null, null, null, null);
- dfs.waitActive();
-
- namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
- + (dfs.getFileSystem()).getUri().getPort();
-
- // Make sure that jobhistory leads to a proper job restart
- // So keep the blocksize and the buffer size small
- JobConf jtConf = new JobConf();
- jtConf.set("mapred.jobtracker.job.history.block.size", "1024");
- jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
- jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
- jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000);
- jtConf.setInt("mapred.reduce.copy.backoff", 4);
-
- mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf);
-
- // Test Lost tracker case
- testRecoveryWithLostTracker(dfs, mr);
- } finally {
- if (mr != null) {
- try {
- mr.shutdown();
- } catch (Exception e) {}
- }
- if (dfs != null) {
- try {
- dfs.shutdown();
- } catch (Exception e) {}
- }
- }
- }
-
- public static void main(String[] args) throws IOException {
- new TestJobTrackerRestartWithLostTracker().testRestartWithLostTracker();
- }
-}
View
280 src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
@@ -1,280 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-
-import junit.framework.TestCase;
-import java.io.*;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * This test checks jobtracker in safe mode. In safe mode the jobtracker upon
- * restart doesnt schedule any new tasks and waits for the (old) trackers to
- * join back.
- */
-public class TestJobTrackerSafeMode extends TestCase {
- final Path testDir =
- new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
- final Path inDir = new Path(testDir, "input");
- final Path shareDir = new Path(testDir, "share");
- final Path outputDir = new Path(testDir, "output");
- final int numDir = 1;
- final int numTrackers = 2;
-
- private static final Log LOG =
- LogFactory.getLog(TestJobTrackerSafeMode.class);
-
- private JobConf configureJob(JobConf conf, int maps, int reduces,
- String mapSignal, String redSignal)
- throws IOException {
- UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir,
- maps, reduces, "test-jobtracker-safemode",
- mapSignal, redSignal);
- return conf;
- }
-
- /**
- * Tests the jobtracker's safemode. The test is as follows :
- * - starts a cluster with 2 trackers
- * - submits a job with large (40) maps to make sure that all the trackers
- * are logged to the job history
- * - wait for the job to be 50% done
- * - stop the jobtracker
- * - wait for the trackers to be done with all the tasks
- * - kill a task tracker
- * - start the jobtracker
- * - start 2 more trackers
- * - now check that while all the tracker are detected (or lost) the
- * scheduling window is closed
- * - check that after all the trackers are recovered, scheduling is opened
- */
- private void testSafeMode(MiniDFSCluster dfs, MiniMRCluster mr)
- throws IOException {
- FileSystem fileSys = dfs.getFileSystem();
- JobConf jobConf = mr.createJobConf();
- String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
- String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
- JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
- int numTracker = jobtracker.getClusterStatus(false).getTaskTrackers();
-
- // Configure the jobs
- JobConf job = configureJob(jobConf, 40, 0, mapSignalFile, redSignalFile);
-
- fileSys.delete(shareDir, true);
-
- // Submit a master job
- JobClient jobClient = new JobClient(job);
- RunningJob rJob = jobClient.submitJob(job);
- JobID id = rJob.getID();
-
- // wait for the job to be inited
- mr.initializeJob(id);
-
- // Make sure that the master job is 50% completed
- while (UtilsForTests.getJobStatus(jobClient, id).mapProgress()
- < 0.5f) {
- LOG.info("Waiting for the job to be 50% done");
- UtilsForTests.waitFor(100);
- }
-
- // Kill the jobtracker
- mr.stopJobTracker();
-
- // Enable recovery on restart
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
- true);
-
- // Signal the maps to complete
- UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
-
- // Signal the reducers to complete
- UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile,
- redSignalFile);
-
- // wait for the tasks to complete at the tracker
- Set<String> trackers = new HashSet<String>();
- for (int i = 0 ; i < numTracker; ++i) {
- TaskTracker t = mr.getTaskTrackerRunner(i).getTaskTracker();
- trackers.add(t.getName());
- int runningCount = t.getRunningTaskStatuses().size();
- while (runningCount != 0) {
- LOG.info("Waiting for tracker " + t.getName() + " to stabilize");
- UtilsForTests.waitFor(100);
- runningCount = 0;
- for (TaskStatus status : t.getRunningTaskStatuses()) {
- if (status.getIsMap()
- && (status.getRunState() == TaskStatus.State.UNASSIGNED
- || status.getRunState() == TaskStatus.State.RUNNING)) {
- ++runningCount;
- }
- }
- }
- }
-
- LOG.info("Trackers have stabilized");
-
- // Kill a tasktracker
- int trackerToKill = --numTracker;
- TaskTracker t = mr.getTaskTrackerRunner(trackerToKill).getTaskTracker();
-
- trackers.remove(t.getName()); // remove this from the set to check
-
- Set<String> lostTrackers = new HashSet<String>();
- lostTrackers.add(t.getName());
-
- // get the attempt-id's to ignore
- // stop the tracker
- LOG.info("Stopping tracker : " + t.getName());
- mr.getTaskTrackerRunner(trackerToKill).getTaskTracker().shutdown();
- mr.stopTaskTracker(trackerToKill);
-
- // Restart the jobtracker
- mr.startJobTracker();
-
- // Wait for the JT to be ready
- UtilsForTests.waitForJobTracker(jobClient);
-
- jobtracker = mr.getJobTrackerRunner().getJobTracker();
-
- // Start a tracker
- LOG.info("Start a new tracker");
- mr.startTaskTracker(null, null, ++numTracker, numDir);
-
- // Start a tracker
- LOG.info("Start a new tracker");
- mr.startTaskTracker(null, null, ++numTracker, numDir);
-
- // Check if the jobs are still running
-
- // Wait for the tracker to be lost
- boolean shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
- while (!checkTrackers(jobtracker, trackers, lostTrackers)) {
- assertFalse("JobTracker has opened up scheduling before all the"
- + " trackers were recovered", shouldSchedule);
- UtilsForTests.waitFor(100);
-
- // snapshot jobtracker's scheduling status
- shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
- }
-
- assertTrue("JobTracker hasnt opened up scheduling even all the"
- + " trackers were recovered",
- jobtracker.recoveryManager.shouldSchedule());
-
- assertEquals("Recovery manager is in inconsistent state",
- 0, jobtracker.recoveryManager.recoveredTrackers.size());
-
- // wait for the job to be complete
- UtilsForTests.waitTillDone(jobClient);
- }
-
- private boolean checkTrackers(JobTracker jobtracker, Set<String> present,
- Set<String> absent) {
- long jobtrackerRecoveryFinishTime =
- jobtracker.getStartTime() + jobtracker.getRecoveryDuration();
- for (String trackerName : present) {
- TaskTrackerStatus status = jobtracker.getTaskTrackerStatus(trackerName);
- // check if the status is present and also the tracker has contacted back
- // after restart
- if (status == null
- || status.getLastSeen() < jobtrackerRecoveryFinishTime) {
- return false;
- }
- }
- for (String trackerName : absent) {
- TaskTrackerStatus status = jobtracker.getTaskTrackerStatus(trackerName);
- // check if the status is still present
- if ( status != null) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Test {@link JobTracker}'s safe mode.
- */
- public void testJobTrackerSafeMode() throws IOException {
- String namenode = null;
- MiniDFSCluster dfs = null;
- MiniMRCluster mr = null;
- FileSystem fileSys = null;
-
- try {
- Configuration conf = new Configuration();
- conf.setBoolean("dfs.replication.considerLoad", false);
- dfs = new MiniDFSCluster(conf, 1, true, null, null);
- dfs.waitActive();
- fileSys = dfs.getFileSystem();
-
- // clean up
- fileSys.delete(testDir, true);
-
- if (!fileSys.mkdirs(inDir)) {
- throw new IOException("Mkdirs failed to create " + inDir.toString());
- }
-
- // Write the input file
- UtilsForTests.writeFile(dfs.getNameNode(), conf,
- new Path(inDir + "/file"), (short)1);
-
- dfs.startDataNodes(conf, 1, true, null, null, null, null);
- dfs.waitActive();
-
- namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
- + (dfs.getFileSystem()).getUri().getPort();
-
- // Make sure that jobhistory leads to a proper job restart
- // So keep the blocksize and the buffer size small
- JobConf jtConf = new JobConf();
- jtConf.set("mapred.jobtracker.job.history.block.size", "512");
- jtConf.set("mapred.jobtracker.job.history.buffer.size", "512");
- jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
- jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
- jtConf.setLong("mapred.tasktracker.expiry.interval", 5000);
- jtConf.setInt("mapred.reduce.copy.backoff", 4);
- jtConf.setLong("mapred.job.reuse.jvm.num.tasks", -1);
-
- mr = new MiniMRCluster(numTrackers, namenode, numDir, null, null, jtConf);
-
- // Test Lost tracker case
- testSafeMode(dfs, mr);
- } finally {
- if (mr != null) {
- try {
- mr.shutdown();
- } catch (Exception e) {}
- }
- if (dfs != null) {
- try {
- dfs.shutdown();
- } catch (Exception e) {}
- }
- }
- }
-
- public static void main(String[] args) throws IOException {
- new TestJobTrackerSafeMode().testJobTrackerSafeMode();
- }
-}
View
453 src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
@@ -1,453 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
-import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
-import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * Test whether the {@link RecoveryManager} is able to tolerate job-recovery
- * failures and the jobtracker is able to tolerate {@link RecoveryManager}
- * failure.
- */
-public class TestRecoveryManager extends TestCase {
- private static final Log LOG =
- LogFactory.getLog(TestRecoveryManager.class);
- private static final Path TEST_DIR =
- new Path(System.getProperty("test.build.data", "/tmp"),
- "test-recovery-manager");
-
- /**
- * Tests the {@link JobTracker} against the exceptions thrown in
- * {@link JobTracker.RecoveryManager}. It does the following :
- * - submits 2 jobs
- * - kills the jobtracker
- * - Garble job.xml for one job causing it to fail in constructor
- * and job.split for another causing it to fail in init.
- * - restarts the jobtracker
- * - checks if the jobtraker starts normally
- */
- public void testJobTracker() throws Exception {
- LOG.info("Testing jobtracker restart with faulty job");
- String signalFile = new Path(TEST_DIR, "signal").toString();
- JobConf conf = new JobConf();
-
- FileSystem fs = FileSystem.get(new Configuration());
- fs.delete(TEST_DIR, true); // cleanup
-
- conf.set("mapred.jobtracker.job.history.block.size", "1024");
- conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
-
- MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
-
- JobConf job1 = mr.createJobConf();
-
- UtilsForTests.configureWaitingJobConf(job1,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0,
- "test-recovery-manager", signalFile, signalFile);
-
- // submit the faulty job
- RunningJob rJob1 = (new JobClient(job1)).submitJob(job1);
- LOG.info("Submitted job " + rJob1.getID());
-
- while (rJob1.mapProgress() < 0.5f) {
- LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
- UtilsForTests.waitFor(100);
- }
-
- JobConf job2 = mr.createJobConf();
-
- UtilsForTests.configureWaitingJobConf(job2,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0,
- "test-recovery-manager", signalFile, signalFile);
-
- // submit the faulty job
- RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
- LOG.info("Submitted job " + rJob2.getID());
-
- while (rJob2.mapProgress() < 0.5f) {
- LOG.info("Waiting for job " + rJob2.getID() + " to be 50% done");
- UtilsForTests.waitFor(100);
- }
-
- // kill the jobtracker
- LOG.info("Stopping jobtracker");
- String sysDir = mr.getJobTrackerRunner().getJobTracker().getSystemDir();
- mr.stopJobTracker();
-
- // delete the job.xml of job #1 causing the job to fail in constructor
- Path jobFile =
- new Path(sysDir, rJob1.getID().toString() + Path.SEPARATOR + "job.xml");
- LOG.info("Deleting job.xml file : " + jobFile.toString());
- fs.delete(jobFile, false); // delete the job.xml file
-
- // create the job.xml file with 0 bytes
- FSDataOutputStream out = fs.create(jobFile);
- out.write(1);
- out.close();
-
- // delete the job.split of job #2 causing the job to fail in initTasks
- Path jobSplitFile =
- new Path(sysDir, rJob2.getID().toString() + Path.SEPARATOR + "job.split");
- LOG.info("Deleting job.split file : " + jobSplitFile.toString());
- fs.delete(jobSplitFile, false); // delete the job.split file
-
- // create the job.split file with 0 bytes
- out = fs.create(jobSplitFile);
- out.write(1);
- out.close();
-
- // make sure that the jobtracker is in recovery mode
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
- true);
- // start the jobtracker
- LOG.info("Starting jobtracker");
- mr.startJobTracker();
- ClusterStatus status =
- mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false);
-
- // check if the jobtracker came up or not
- assertEquals("JobTracker crashed!",
- JobTracker.State.RUNNING, status.getJobTrackerState());
-
- mr.shutdown();
- }
-
- /**
- * Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown
- * during recovery. It does the following :
- * - submits a job with HIGH priority and x tasks
- * - allows it to complete 50%
- * - submits another job with normal priority and y tasks
- * - kills the jobtracker
- * - restarts the jobtracker with max-tasks-per-job such that
- * y < max-tasks-per-job < x
- * - checks if the jobtraker starts normally and job#2 is recovered while
- * job#1 is failed.
- */
- public void testRecoveryManager() throws Exception {
- LOG.info("Testing recovery-manager");
- String signalFile = new Path(TEST_DIR, "signal").toString();
-
- // clean up
- FileSystem fs = FileSystem.get(new Configuration());
- fs.delete(TEST_DIR, true);
-
- JobConf conf = new JobConf();
- conf.set("mapred.jobtracker.job.history.block.size", "1024");
- conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
-
- MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
- JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
-
- JobConf job1 = mr.createJobConf();
- // set the high priority
- job1.setJobPriority(JobPriority.HIGH);
-
- UtilsForTests.configureWaitingJobConf(job1,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 30, 0,
- "test-recovery-manager", signalFile, signalFile);
-
- // submit the faulty job
- JobClient jc = new JobClient(job1);
- RunningJob rJob1 = jc.submitJob(job1);
- LOG.info("Submitted first job " + rJob1.getID());
-
- while (rJob1.mapProgress() < 0.5f) {
- LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
- UtilsForTests.waitFor(100);
- }
-
- // now submit job2
- JobConf job2 = mr.createJobConf();
-
- String signalFile1 = new Path(TEST_DIR, "signal1").toString();
- UtilsForTests.configureWaitingJobConf(job2,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0,
- "test-recovery-manager", signalFile1, signalFile1);
-
- // submit the job
- RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
- LOG.info("Submitted job " + rJob2.getID());
-
- // wait for it to init
- JobInProgress jip = jobtracker.getJob(rJob2.getID());
-
- while (!jip.inited()) {
- LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
- UtilsForTests.waitFor(100);
- }
-
- // now submit job3 with inappropriate acls
- JobConf job3 = mr.createJobConf();
- job3.set("hadoop.job.ugi","abc,users");
-
- UtilsForTests.configureWaitingJobConf(job3,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 1, 0,
- "test-recovery-manager", signalFile, signalFile);
-
- // submit the job
- RunningJob rJob3 = (new JobClient(job3)).submitJob(job3);
- LOG.info("Submitted job " + rJob3.getID() + " with different user");
-
- jip = jobtracker.getJob(rJob3.getID());
-
- while (!jip.inited()) {
- LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
- UtilsForTests.waitFor(100);
- }
-
- // kill the jobtracker
- LOG.info("Stopping jobtracker");
- mr.stopJobTracker();
-
- // make sure that the jobtracker is in recovery mode
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
- true);
- mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
-
- mr.getJobTrackerConf().setBoolean("mapred.acls.enabled" , true);
- UserGroupInformation ugi = UserGroupInformation.readFrom(job1);
- mr.getJobTrackerConf().set("mapred.queue.default.acl-submit-job",
- ugi.getUserName());
-
- // start the jobtracker
- LOG.info("Starting jobtracker");
- mr.startJobTracker();
- UtilsForTests.waitForJobTracker(jc);
-
- jobtracker = mr.getJobTrackerRunner().getJobTracker();
-
- // assert that job2 is recovered by the jobtracker as job1 would fail
- assertEquals("Recovery manager failed to tolerate job failures",
- 2, jobtracker.getAllJobs().length);
-
- // check if the job#1 has failed
- JobStatus status = jobtracker.getJobStatus(rJob1.getID());
- assertEquals("Faulty job not failed",
- JobStatus.FAILED, status.getRunState());
-
- jip = jobtracker.getJob(rJob2.getID());
- assertFalse("Job should be running", jip.isComplete());
-
- status = jobtracker.getJobStatus(rJob3.getID());
- assertNull("Job should be missing", status);
-
- mr.shutdown();
- }
-
- /**
- * Test if restart count of the jobtracker is correctly managed.
- * Steps are as follows :
- * - start the jobtracker and check if the info file gets created.
- * - stops the jobtracker, deletes the jobtracker.info file and checks if
- * upon restart the recovery is 'off'
- * - submit a job to the jobtracker.
- * - restart the jobtracker k times and check if the restart count on ith
- * iteration is i.
- * - submit a new job and check if its restart count is 0.
- * - garble the jobtracker.info file and restart he jobtracker, the
- * jobtracker should crash.
- */
- public void testRestartCount() throws Exception {
- LOG.info("Testing restart-count");
- String signalFile = new Path(TEST_DIR, "signal").toString();
-
- // clean up
- FileSystem fs = FileSystem.get(new Configuration());
- fs.delete(TEST_DIR, true);
-
- JobConf conf = new JobConf();
- conf.set("mapred.jobtracker.job.history.block.size", "1024");
- conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
- conf.setBoolean("mapred.jobtracker.restart.recover", true);
- // since there is no need for initing
- conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
- TaskScheduler.class);
-
- MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
- JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
- JobClient jc = new JobClient(mr.createJobConf());
-
- // check if the jobtracker info file exists
- Path infoFile = jobtracker.recoveryManager.getRestartCountFile();
- assertTrue("Jobtracker infomation is missing", fs.exists(infoFile));
-
- // check if garbling the system files disables the recovery process
- LOG.info("Stopping jobtracker for testing with system files deleted");
- mr.stopJobTracker();
-
- // delete the info file
- Path rFile = jobtracker.recoveryManager.getRestartCountFile();
- fs.delete(rFile,false);
-
- // start the jobtracker
- LOG.info("Starting jobtracker with system files deleted");
- mr.startJobTracker();
-
- UtilsForTests.waitForJobTracker(jc);
- jobtracker = mr.getJobTrackerRunner().getJobTracker();
-
- // check if the recovey is disabled
- assertFalse("Recovery is not disabled upon missing system files",
- jobtracker.recoveryManager.shouldRecover());
-
- // check if the system dir is sane
- assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
- Path tFile = jobtracker.recoveryManager.getTempRestartCountFile();
- assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
-
- // submit a job
- JobConf job = mr.createJobConf();
-
- UtilsForTests.configureWaitingJobConf(job,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 2, 0,
- "test-recovery-manager", signalFile, signalFile);
-
- // submit the faulty job
- RunningJob rJob = jc.submitJob(job);
- LOG.info("Submitted first job " + rJob.getID());
-
- // wait for 1 min
- UtilsForTests.waitFor(60000);
-
- // kill the jobtracker multiple times and check if the count is correct
- for (int i = 1; i <= 5; ++i) {
- LOG.info("Stopping jobtracker for " + i + " time");
- mr.stopJobTracker();
-
- // start the jobtracker
- LOG.info("Starting jobtracker for " + i + " time");
- mr.startJobTracker();
-
- UtilsForTests.waitForJobTracker(jc);
-
- // check if the system dir is sane
- assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
- assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
-
- jobtracker = mr.getJobTrackerRunner().getJobTracker();
- JobInProgress jip = jobtracker.getJob(rJob.getID());
-
- // assert if restart count is correct
- assertEquals("Recovery manager failed to recover restart count",
- i, jip.getNumRestarts());
- }
-
- // kill the old job
- rJob.killJob();
-
- // II. Submit a new job and check if the restart count is 0
- JobConf job1 = mr.createJobConf();
-
- UtilsForTests.configureWaitingJobConf(job1,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0,
- "test-recovery-manager", signalFile, signalFile);
-
- // submit a new job
- rJob = jc.submitJob(job1);
- LOG.info("Submitted first job after restart" + rJob.getID());
-
- // assert if restart count is correct
- JobInProgress jip = jobtracker.getJob(rJob.getID());
- assertEquals("Restart count for new job is incorrect",
- 0, jip.getNumRestarts());
-
- LOG.info("Stopping jobtracker for testing the fs errors");
- mr.stopJobTracker();
-
- // check if system.dir problems in recovery kills the jobtracker
- fs.delete(rFile, false);
- FSDataOutputStream out = fs.create(rFile);
- out.writeBoolean(true);
- out.close();
-
- // start the jobtracker
- LOG.info("Starting jobtracker with fs errors");
- mr.startJobTracker();
- JobTrackerRunner runner = mr.getJobTrackerRunner();
- assertFalse("JobTracker is still alive", runner.isActive());
-
- mr.shutdown();
- }
-
- /**
- * Test if the jobtracker waits for the info file to be created before
- * starting.
- */
- public void testJobTrackerInfoCreation() throws Exception {
- LOG.info("Testing jobtracker.info file");
- Configuration dfsConf = new Configuration();
- MiniDFSCluster dfs = new MiniDFSCluster(dfsConf, 1, true, null);
- String namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
- + (dfs.getFileSystem()).getUri().getPort();
- // shut down the data nodes
- dfs.shutdownDataNodes();
-
- // start the jobtracker
- JobConf conf = new JobConf(dfsConf);
- FileSystem.setDefaultUri(conf, namenode);
- conf.set("mapred.job.tracker", "localhost:0");
- conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
-
- JobTracker jobtracker = new JobTracker(conf);
-
- // now check if the update restart count works fine or not
- boolean failed = false;
- try {
- jobtracker.recoveryManager.updateRestartCount();
- } catch (IOException ioe) {
- failed = true;
- }
- assertTrue("JobTracker created info files without datanodes!!!", failed);
-
- Path restartFile = jobtracker.recoveryManager.getRestartCountFile();
- Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile();
- FileSystem fs = dfs.getFileSystem();
- assertFalse("Info file exists after update failure",
- fs.exists(restartFile));
- assertFalse("Temporary restart-file exists after update failure",
- fs.exists(restartFile));
-
- // start 1 data node
- dfs.startDataNodes(conf, 1, true, null, null, null, null);
- dfs.waitActive();
-
- failed = false;
- try {
- jobtracker.recoveryManager.updateRestartCount();
- } catch (IOException ioe) {
- failed = true;
- }
- assertFalse("JobTracker failed to create info files with datanodes!!!", failed);
- }
-}
View
50 src/webapps/hdfs/dfsnodelist_txt.jsp
@@ -1,50 +0,0 @@
-<%@ page
- contentType="text/plain; charset=UTF-8"
- import="javax.servlet.*"
- import="javax.servlet.http.*"
- import="java.io.*"
- import="java.util.*"
- import="org.apache.hadoop.fs.*"
- import="org.apache.hadoop.hdfs.*"
- import="org.apache.hadoop.hdfs.server.common.*"
- import="org.apache.hadoop.hdfs.server.namenode.*"
- import="org.apache.hadoop.hdfs.server.datanode.*"
- import="org.apache.hadoop.hdfs.protocol.*"
- import="org.apache.hadoop.util.*"
- import="java.text.DateFormat"
- import="java.lang.Math"
- import="java.net.URLEncoder"
-%>
-<%!
-JspHelper jspHelper = new JspHelper();
-public void generateDFSNodesList(JspWriter out, NameNode nn,
- HttpServletRequest request)
- throws IOException {
- ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
- ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
- ArrayList<DatanodeDescriptor> excluded = new ArrayList<DatanodeDescriptor>();
- jspHelper.DFSNodesStatus(live, dead, excluded);
- String whatNodes = request.getParameter("whatNodes");
- ArrayList<DatanodeDescriptor> toBePrinted;
- if ("DEAD".equalsIgnoreCase(whatNodes)) {
- toBePrinted = dead;
- } else if ("EXCLUDED".equalsIgnoreCase(whatNodes)) {
- toBePrinted = excluded;
- } else if ("LIVE".equalsIgnoreCase(whatNodes)) {
- toBePrinted = live;
- } else { // Default is all nodes
- toBePrinted = new ArrayList<DatanodeDescriptor>();
- toBePrinted.addAll(dead);
- toBePrinted.addAll(excluded);
- toBePrinted.addAll(live);
- }
- for (DatanodeDescriptor d : toBePrinted) {
- out.print(d.getHostName() + "\n");
- }
-}
-%>
-<%
- NameNode nn = (NameNode)application.getAttribute("name.node");
- FSNamesystem fsn = nn.getNamesystem();
- generateDFSNodesList(out, nn, request);
-%>
Please sign in to comment.
Something went wrong with that request. Please try again.