<?xml version="1.0" encoding="UTF-8"?>
<commit>
  <added type="array">
    <added>
      <filename>conf/mapred-queue-acls.xml.template</filename>
    </added>
    <added>
      <filename>src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java</filename>
    </added>
  </added>
  <modified type="array">
    <modified>
      <diff>@@ -303,6 +303,16 @@
             &lt;em&gt;mapred.queue.queue-name.acl-name&lt;/em&gt;, defined below.
           &lt;/td&gt;
         &lt;/tr&gt;
+		  &lt;/table&gt;
+      
+      &lt;p&gt;&lt;br/&gt;&lt;code&gt; conf/mapred-queue-acls.xml&lt;/code&gt;&lt;/p&gt;
+      
+      &lt;table&gt;
+       &lt;tr&gt;
+          &lt;th&gt;Parameter&lt;/th&gt;
+          &lt;th&gt;Value&lt;/th&gt; 
+          &lt;th&gt;Notes&lt;/th&gt;
+       &lt;/tr&gt;
         &lt;tr&gt;
           &lt;td&gt;mapred.queue.&lt;em&gt;queue-name&lt;/em&gt;.acl-submit-job&lt;/td&gt;
           &lt;td&gt;List of users and groups that can submit jobs to the
@@ -330,7 +340,8 @@
             his/her own job, irrespective of the ACLs.
           &lt;/td&gt;
         &lt;/tr&gt;
-		  &lt;/table&gt;
+      &lt;/table&gt;
+      
 
           &lt;p&gt;Typically all the above parameters are marked as 
           &lt;a href=&quot;ext:api/org/apache/hadoop/conf/configuration/final_parameters&quot;&gt;</diff>
      <filename>src/docs/src/documentation/content/xdocs/cluster_setup.xml</filename>
    </modified>
    <modified>
      <diff>@@ -553,7 +553,24 @@
 			           &lt;/tr&gt;
 			     &lt;/table&gt;
 			&lt;/section&gt;
-			
+			&lt;section&gt;
+        &lt;title&gt;mradmin&lt;/title&gt;
+        &lt;p&gt;Runs MR admin client&lt;/p&gt;
+        &lt;p&gt;&lt;code&gt;Usage: hadoop mradmin  [&lt;/code&gt;
+        &lt;a href=&quot;commands_manual.html#Generic+Options&quot;&gt;GENERIC_OPTIONS&lt;/a&gt;
+        &lt;code&gt;] [-refreshQueueAcls] &lt;/code&gt;&lt;/p&gt;
+        &lt;table&gt;
+        &lt;tr&gt;
+        &lt;th&gt; COMMAND_OPTION &lt;/th&gt;&lt;th&gt; Description &lt;/th&gt;
+        &lt;/tr&gt;
+        &lt;tr&gt;
+        &lt;td&gt;&lt;code&gt;-refreshQueueAcls&lt;/code&gt;&lt;/td&gt;
+        &lt;td&gt; Refresh the queue acls used by hadoop, to check access during submissions
+        and administration of the job by the user. The properties present in
+        &lt;code&gt;mapred-queue-acls.xml&lt;/code&gt; is reloaded by the queue manager.&lt;/td&gt;
+        &lt;/tr&gt;
+        &lt;/table&gt;
+      &lt;/section&gt;
 			&lt;section&gt;
 				&lt;title&gt; jobtracker &lt;/title&gt;
 				&lt;p&gt;</diff>
      <filename>src/docs/src/documentation/content/xdocs/commands_manual.xml</filename>
    </modified>
    <modified>
      <diff>@@ -883,29 +883,6 @@
 &lt;/property&gt;
 
 &lt;property&gt;
-  &lt;name&gt;mapred.queue.default.acl-submit-job&lt;/name&gt;
-  &lt;value&gt;*&lt;/value&gt;
-  &lt;description&gt; Comma separated list of user and group names that are allowed
-    to submit jobs to the 'default' queue. The user list and the group list
-    are separated by a blank. For e.g. alice,bob group1,group2. 
-    If set to the special value '*', it means all users are allowed to 
-    submit jobs. 
-  &lt;/description&gt;
-&lt;/property&gt;
-
-&lt;property&gt;
-  &lt;name&gt;mapred.queue.default.acl-administer-jobs&lt;/name&gt;
-  &lt;value&gt;*&lt;/value&gt;
-  &lt;description&gt; Comma separated list of user and group names that are allowed
-    to delete jobs or modify job's priority for jobs not owned by the current
-    user in the 'default' queue. The user list and the group list
-    are separated by a blank. For e.g. alice,bob group1,group2. 
-    If set to the special value '*', it means all users are allowed to do 
-    this operation.
-  &lt;/description&gt;
-&lt;/property&gt;
-
-&lt;property&gt;
   &lt;name&gt;mapred.job.queue.name&lt;/name&gt;
   &lt;value&gt;default&lt;/value&gt;
   &lt;description&gt; Queue to which a job is submitted. This must match one of the</diff>
      <filename>src/mapred/mapred-default.xml</filename>
    </modified>
    <modified>
      <diff>@@ -86,7 +86,8 @@ import org.apache.hadoop.util.VersionInfo;
  *
  *******************************************************/
 public class JobTracker implements MRConstants, InterTrackerProtocol,
-    JobSubmissionProtocol, TaskTrackerManager, RefreshAuthorizationPolicyProtocol {
+    JobSubmissionProtocol, TaskTrackerManager,
+    RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol {
 
   static{
     Configuration.addDefaultResource(&quot;mapred-default.xml&quot;);
@@ -96,6 +97,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   static long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
   static long RETIRE_JOB_INTERVAL;
   static long RETIRE_JOB_CHECK_INTERVAL;
+
+  
   // The interval after which one fault of a tracker will be discarded,
   // if there are no faults during this. 
   private static long UPDATE_FAULTY_TRACKER_INTERVAL = 24 * 60 * 60 * 1000;
@@ -205,6 +208,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       return JobSubmissionProtocol.versionID;
     } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
       return RefreshAuthorizationPolicyProtocol.versionID;
+    } else if (protocol.equals(AdminOperationsProtocol.class.getName())){
+      return AdminOperationsProtocol.versionID;
     } else {
       throw new IOException(&quot;Unknown protocol to job tracker: &quot; + protocol);
     }
@@ -1520,8 +1525,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     // Read the hosts/exclude files to restrict access to the jobtracker.
     this.hostsReader = new HostsFileReader(conf.get(&quot;mapred.hosts&quot;, &quot;&quot;),
                                            conf.get(&quot;mapred.hosts.exclude&quot;, &quot;&quot;));
-    
-    queueManager = new QueueManager(this.conf);
+
+    Configuration queuesConf = new Configuration(this.conf);
+    queueManager = new QueueManager(queuesConf);
     
     // Create the scheduler
     Class&lt;? extends TaskScheduler&gt; schedulerClass
@@ -3698,4 +3704,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       throw new IOException(jobStr.toString() + msg);
     }
   }
+
+  @Override
+  public void refreshQueueAcls() throws IOException{
+    LOG.info(&quot;Refreshing queue acls. requested by : &quot; + 
+        UserGroupInformation.getCurrentUGI().getUserName());
+    this.queueManager.refreshAcls(new Configuration(this.conf));
+  }
 }</diff>
      <filename>src/mapred/org/apache/hadoop/mapred/JobTracker.java</filename>
    </modified>
    <modified>
      <diff>@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Set;
@@ -25,10 +26,10 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Class that exposes information about queues maintained by the Hadoop
@@ -62,6 +63,9 @@ class QueueManager {
   // Whether ACLs are enabled in the system or not.
   private boolean aclsEnabled;
   
+  //Resource in which queue acls are configured.
+  static final String QUEUE_ACLS_FILE_NAME = &quot;mapred-queue-acls.xml&quot;;
+  
   /**
    * Enum representing an operation that can be performed on a queue.
    */
@@ -228,36 +232,65 @@ class QueueManager {
   }
   
   /**
-   * Refresh information configured for queues in the system by reading
-   * it from the passed in {@link org.apache.hadoop.conf.Configuration}.
-   *
-   * Previously stored information about queues is removed and new
-   * information populated from the configuration.
+   * Refresh the acls for the configured queues in the system by reading
+   * it from mapred-queue-acls.xml.
    * 
-   * @param conf New configuration for the queues. 
+   * The previous acls are removed. Previously configured queues and
+   * if or not acl is disabled is retained.
+   * 
+   * @throws IOException when queue ACL configuration file is invalid.
    */
-  public synchronized void refresh(Configuration conf) {
-    queueNames.clear();
-    aclsMap.clear();
-    schedulerInfoObjects.clear();
-    initialize(conf);
+  synchronized void refreshAcls(Configuration conf) throws IOException {
+    try {
+      HashMap&lt;String, AccessControlList&gt; newAclsMap = 
+        getQueueAcls(conf);
+      aclsMap = newAclsMap;
+    } catch (Throwable t) {
+      String exceptionString = StringUtils.stringifyException(t);
+      LOG.warn(&quot;Queue ACLs could not be refreshed because there was an &quot; +
+      		&quot;exception in parsing the configuration: &quot;+ exceptionString +
+      		&quot;. Existing ACLs are retained.&quot;);
+      throw new IOException(exceptionString);
+    }
+
   }
   
-  private void initialize(Configuration conf) {
-    aclsEnabled = conf.getBoolean(&quot;mapred.acls.enabled&quot;, false);
-    String[] queues = conf.getStrings(&quot;mapred.queue.names&quot;, 
-                                  new String[] {JobConf.DEFAULT_QUEUE_NAME});
-    addToSet(queueNames, queues);
-    
-    // for every queue, and every operation, get the ACL
-    // if any is specified and store in aclsMap.
-    for (String queue : queues) {
+  private void checkDeprecation(Configuration conf) {
+    for(String queue: queueNames) {
+      for (QueueOperation oper : QueueOperation.values()) {
+        String key = toFullPropertyName(queue, oper.getAclName());
+        String aclString = conf.get(key);
+        if(aclString != null) {
+          LOG.warn(&quot;Configuring queue ACLs in mapred-site.xml or &quot; +
+          		&quot;hadoop-site.xml is deprecated. Configure queue ACLs in &quot; + 
+          		QUEUE_ACLS_FILE_NAME);
+          return;
+        }
+      }
+    }
+  }
+  
+  private HashMap&lt;String, AccessControlList&gt; getQueueAcls(Configuration conf)  {
+    checkDeprecation(conf);
+    conf.addResource(QUEUE_ACLS_FILE_NAME);
+    HashMap&lt;String, AccessControlList&gt; aclsMap = 
+      new HashMap&lt;String, AccessControlList&gt;();
+    for (String queue : queueNames) {
       for (QueueOperation oper : QueueOperation.values()) {
         String key = toFullPropertyName(queue, oper.getAclName());
         String aclString = conf.get(key, &quot;*&quot;);
         aclsMap.put(key, new AccessControlList(aclString));
       }
-    }
+    } 
+    return aclsMap;
+  }
+  
+  private void initialize(Configuration conf) {
+    aclsEnabled = conf.getBoolean(&quot;mapred.acls.enabled&quot;, false);
+    String[] queues = conf.getStrings(&quot;mapred.queue.names&quot;, 
+        new String[] {JobConf.DEFAULT_QUEUE_NAME});
+    addToSet(queueNames, queues);
+    aclsMap = getQueueAcls(conf);
   }
   
   private static final String toFullPropertyName(String queue, </diff>
      <filename>src/mapred/org/apache/hadoop/mapred/QueueManager.java</filename>
    </modified>
    <modified>
      <diff>@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.AdminOperationsProtocol;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
@@ -34,9 +35,10 @@ import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Administrative access to Hadoop Map-Reduce.
- *
+ * 
  * Currently it only provides the ability to connect to the {@link JobTracker}
- * and refresh the service-level authorization policy.
+ * and 1) refresh the service-level authorization policy, 2) refresh queue acl
+ * properties.
  */
 public class MRAdmin extends Configured implements Tool {
 
@@ -51,21 +53,28 @@ public class MRAdmin extends Configured implements Tool {
   private static void printHelp(String cmd) {
     String summary = &quot;hadoop mradmin is the command to execute Map-Reduce administrative commands.\n&quot; +
     &quot;The full syntax is: \n\n&quot; +
-    &quot;hadoop mradmin [-refreshServiceAcl] [-help [cmd]]\n&quot;; 
+    &quot;hadoop mradmin [-refreshServiceAcl] [-refreshQueueAcls] [-help [cmd]]\n&quot;; 
 
   String refreshServiceAcl = &quot;-refreshServiceAcl: Reload the service-level authorization policy file\n&quot; +
     &quot;\t\tJobtracker will reload the authorization policy file.\n&quot;;
-  
+
+  String refreshQueueAcls =
+        &quot;-refreshQueueAcls: Reload the queue acls\n&quot;
+            + &quot;\t\tJobTracker will reload the mapred-queue-acls.xml file.\n&quot;;
+
   String help = &quot;-help [cmd]: \tDisplays help for the given command or all commands if none\n&quot; +
     &quot;\t\tis specified.\n&quot;;
 
   if (&quot;refreshServiceAcl&quot;.equals(cmd)) {
     System.out.println(refreshServiceAcl);
+  } else if (&quot;refreshQueueAcls&quot;.equals(cmd)) {
+    System.out.println(refreshQueueAcls);
   } else if (&quot;help&quot;.equals(cmd)) {
     System.out.println(help);
   } else {
     System.out.println(summary);
     System.out.println(refreshServiceAcl);
+    System.out.println(refreshQueueAcls);
     System.out.println(help);
     System.out.println();
     ToolRunner.printGenericCommandUsage(System.out);
@@ -79,11 +88,13 @@ public class MRAdmin extends Configured implements Tool {
    */
   private static void printUsage(String cmd) {
     if (&quot;-refreshServiceAcl&quot;.equals(cmd)) {
-      System.err.println(&quot;Usage: java MRAdmin&quot;
-                         + &quot; [-refreshServiceAcl]&quot;);
+      System.err.println(&quot;Usage: java MRAdmin&quot; + &quot; [-refreshServiceAcl]&quot;);
+    } else if (&quot;-refreshQueueAcls&quot;.equals(cmd)) {
+      System.err.println(&quot;Usage: java MRAdmin&quot; + &quot; [-refreshQueueAcls]&quot;);
     } else {
       System.err.println(&quot;Usage: java MRAdmin&quot;);
       System.err.println(&quot;           [-refreshServiceAcl]&quot;);
+      System.err.println(&quot;           [-refreshQueueAcls]&quot;);
       System.err.println(&quot;           [-help [cmd]]&quot;);
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
@@ -120,7 +131,25 @@ public class MRAdmin extends Configured implements Tool {
     
     return 0;
   }
-  
+
+  private int refreshQueueAcls() throws IOException {
+    // Get the current configuration
+    Configuration conf = getConf();
+    
+    // Create the client
+    AdminOperationsProtocol adminOperationsProtocol = 
+      (AdminOperationsProtocol) 
+      RPC.getProxy(AdminOperationsProtocol.class, 
+                   AdminOperationsProtocol.versionID, 
+                   JobTracker.getAddress(conf), getUGI(conf), conf,
+                   NetUtils.getSocketFactory(conf, 
+                                             AdminOperationsProtocol.class));
+    
+    // Refresh the queue properties
+    adminOperationsProtocol.refreshQueueAcls();
+    
+    return 0;
+  }
 
   @Override
   public int run(String[] args) throws Exception {
@@ -136,7 +165,7 @@ public class MRAdmin extends Configured implements Tool {
     //
     // verify that we have enough command line parameters
     //
-    if (&quot;-refreshServiceAcl&quot;.equals(cmd)) {
+    if (&quot;-refreshServiceAcl&quot;.equals(cmd) || &quot;-refreshQueueAcls&quot;.equals(cmd)) {
       if (args.length != 1) {
         printUsage(cmd);
         return exitCode;
@@ -147,6 +176,8 @@ public class MRAdmin extends Configured implements Tool {
     try {
       if (&quot;-refreshServiceAcl&quot;.equals(cmd)) {
         exitCode = refreshAuthorizationPolicy();
+      } else if (&quot;-refreshQueueAcls&quot;.equals(cmd)) {
+        exitCode = refreshQueueAcls();
       } else if (&quot;-help&quot;.equals(cmd)) {
         if (i &lt; args.length) {
           printUsage(args[i]);
@@ -189,5 +220,4 @@ public class MRAdmin extends Configured implements Tool {
     int result = ToolRunner.run(new MRAdmin(), args);
     System.exit(result);
   }
-
 }</diff>
      <filename>src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java</filename>
    </modified>
    <modified>
      <diff>@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -28,13 +32,13 @@ import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 
 public class TestQueueManager extends TestCase {
 
@@ -195,6 +199,169 @@ public class TestQueueManager extends TestCase {
     verifyJobPriorityChangeAsOtherUser(conf, false, 
                               &quot;junk-user,junk-user-group&quot;);
   }
+
+  /**
+   * Test to verify refreshing of queue properties by using MRAdmin tool.
+   * 
+   * @throws Exception
+   */
+  public void testACLRefresh() throws Exception {
+    String queueConfigPath =
+        System.getProperty(&quot;test.build.extraconf&quot;, &quot;build/test/extraconf&quot;);
+    File queueConfigFile =
+        new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
+    File hadoopConfigFile = new File(queueConfigPath, &quot;mapred-site.xml&quot;);
+    try {
+      //Setting up default mapred-site.xml
+      Properties hadoopConfProps = new Properties();
+      //these properties should be retained.
+      hadoopConfProps.put(&quot;mapred.queue.names&quot;, &quot;default,q1,q2&quot;);
+      hadoopConfProps.put(&quot;mapred.acls.enabled&quot;, &quot;true&quot;);
+      //These property should always be overridden
+      hadoopConfProps.put(&quot;mapred.queue.default.acl-submit-job&quot;, &quot;u1&quot;);
+      hadoopConfProps.put(&quot;mapred.queue.q1.acl-submit-job&quot;, &quot;u2&quot;);
+      hadoopConfProps.put(&quot;mapred.queue.q2.acl-submit-job&quot;, &quot;u1&quot;);
+      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+      
+      //Actual property which would be used.
+      Properties queueConfProps = new Properties();
+      queueConfProps.put(&quot;mapred.queue.default.acl-submit-job&quot;, &quot; &quot;);
+      //Writing out the queue configuration file.
+      UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
+      
+      //Create a new configuration to be used with QueueManager
+      JobConf conf = new JobConf();
+      QueueManager queueManager = new QueueManager(conf);
+      UserGroupInformation ugi = UnixUserGroupInformation.getCurrentUGI();
+      //Job Submission should fail because ugi to be used is set to blank.
+      assertFalse(&quot;User Job Submission Succeeded before refresh.&quot;,
+          queueManager.hasAccess(&quot;default&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertFalse(&quot;User Job Submission Succeeded before refresh.&quot;,
+          queueManager.hasAccess(&quot;q1&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertFalse(&quot;User Job Submission Succeeded before refresh.&quot;,
+          queueManager.hasAccess(&quot;q2&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      
+      //Test job submission as alternate user.
+      Configuration alternateUserConfig = new Configuration();
+      alternateUserConfig.set(&quot;hadoop.job.ugi&quot;,&quot;u1,users&quot;);
+      UserGroupInformation alternateUgi = 
+        UserGroupInformation.readFrom(alternateUserConfig);
+      assertTrue(&quot;Alternate User Job Submission failed before refresh.&quot;,
+          queueManager.hasAccess(&quot;q2&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, alternateUgi));
+      
+      //Set acl for the current user.
+      queueConfProps.put(&quot;mapred.queue.default.acl-submit-job&quot;, ugi.getUserName());
+      queueConfProps.put(&quot;mapred.queue.q1.acl-submit-job&quot;, ugi.getUserName());
+      queueConfProps.put(&quot;mapred.queue.q2.acl-submit-job&quot;, ugi.getUserName());
+      //write out queue-acls.xml.
+      UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
+      //refresh configuration
+      queueManager.refreshAcls(conf);
+      //Submission should succeed
+      assertTrue(&quot;User Job Submission failed after refresh.&quot;,
+          queueManager.hasAccess(&quot;default&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue(&quot;User Job Submission failed after refresh.&quot;,
+          queueManager.hasAccess(&quot;q1&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue(&quot;User Job Submission failed after refresh.&quot;,
+          queueManager.hasAccess(&quot;q2&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertFalse(&quot;Alternate User Job Submission succeeded after refresh.&quot;,
+          queueManager.hasAccess(&quot;q2&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, alternateUgi));
+      //delete the ACL file.
+      queueConfigFile.delete();
+      
+      //rewrite the mapred-site.xml
+      hadoopConfProps.put(&quot;mapred.acls.enabled&quot;, &quot;true&quot;);
+      hadoopConfProps.put(&quot;mapred.queue.default.acl-submit-job&quot;, ugi.getUserName());
+      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+      queueManager.refreshAcls(conf);
+      assertTrue(&quot;User Job Submission failed after refresh and no queue acls file.&quot;,
+          queueManager.hasAccess(&quot;default&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+    } finally{
+      if(queueConfigFile.exists()) {
+        queueConfigFile.delete();
+      }
+      if(hadoopConfigFile.exists()) {
+        hadoopConfigFile.delete();
+      }
+    }
+  }
+
+  public void testQueueAclRefreshWithInvalidConfFile() throws IOException {
+    String queueConfigPath =
+      System.getProperty(&quot;test.build.extraconf&quot;, &quot;build/test/extraconf&quot;);
+    File queueConfigFile =
+      new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
+    File hadoopConfigFile = new File(queueConfigPath, &quot;hadoop-site.xml&quot;);
+    try {
+      // queue properties with which the cluster is started.
+      Properties hadoopConfProps = new Properties();
+      hadoopConfProps.put(&quot;mapred.queue.names&quot;, &quot;default,q1,q2&quot;);
+      hadoopConfProps.put(&quot;mapred.acls.enabled&quot;, &quot;true&quot;);
+      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+      
+      //properties for mapred-queue-acls.xml
+      Properties queueConfProps = new Properties();
+      UserGroupInformation ugi = UnixUserGroupInformation.getCurrentUGI();
+      queueConfProps.put(&quot;mapred.queue.default.acl-submit-job&quot;, ugi.getUserName());
+      queueConfProps.put(&quot;mapred.queue.q1.acl-submit-job&quot;, ugi.getUserName());
+      queueConfProps.put(&quot;mapred.queue.q2.acl-submit-job&quot;, ugi.getUserName());
+      UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
+      
+      Configuration conf = new JobConf();
+      QueueManager queueManager = new QueueManager(conf);
+      //Testing access to queue.
+      assertTrue(&quot;User Job Submission failed.&quot;,
+          queueManager.hasAccess(&quot;default&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue(&quot;User Job Submission failed.&quot;,
+          queueManager.hasAccess(&quot;q1&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue(&quot;User Job Submission failed.&quot;,
+          queueManager.hasAccess(&quot;q2&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      
+      //Write out a new incomplete invalid configuration file.
+      PrintWriter writer = new PrintWriter(new FileOutputStream(queueConfigFile));
+      writer.println(&quot;&lt;configuration&gt;&quot;);
+      writer.println(&quot;&lt;property&gt;&quot;);
+      writer.flush();
+      writer.close();
+      try {
+        //Exception to be thrown by queue manager because configuration passed
+        //is invalid.
+        queueManager.refreshAcls(conf);
+        fail(&quot;Refresh of ACLs should have failed with invalid conf file.&quot;);
+      } catch (Exception e) {
+      }
+      assertTrue(&quot;User Job Submission failed after invalid conf file refresh.&quot;,
+          queueManager.hasAccess(&quot;default&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue(&quot;User Job Submission failed after invalid conf file refresh.&quot;,
+          queueManager.hasAccess(&quot;q1&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+      assertTrue(&quot;User Job Submission failed after invalid conf file refresh.&quot;,
+          queueManager.hasAccess(&quot;q2&quot;, QueueManager.QueueOperation.
+              SUBMIT_JOB, ugi));
+    } finally {
+      //Cleanup the configuration files in all cases
+      if(hadoopConfigFile.exists()) {
+        hadoopConfigFile.delete();
+      }
+      if(queueConfigFile.exists()) {
+        queueConfigFile.delete();
+      }
+    }
+  }
+  
   
   private JobConf setupConf(String aclName, String aclValue) {
     JobConf conf = new JobConf();
@@ -217,10 +384,20 @@ public class TestQueueManager extends TestCase {
   }
 
   private void verifyJobSubmission(JobConf conf, boolean shouldSucceed, 
-                                    String queue) throws IOException {
+      String queue) throws IOException {
     setUpCluster(conf);
     try {
-      RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queue);
+      runAndVerifySubmission(conf, shouldSucceed, queue, null);
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  private void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
+      String queue, String userInfo)
+      throws IOException {
+    try {
+      RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, userInfo, queue);
       if (shouldSucceed) {
         assertTrue(rjob.isSuccessful());
       } else {
@@ -411,14 +588,14 @@ public class TestQueueManager extends TestCase {
     if (queueName != null) {
       clientConf.setQueueName(queueName);
     }
+    JobConf jc = new JobConf(clientConf);
+    if (userInfo != null) {
+      jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
+    }
     RunningJob rJob = null;
     if (shouldComplete) {
-      rJob = JobClient.runJob(clientConf);  
+      rJob = JobClient.runJob(jc);  
     } else {
-      JobConf jc = new JobConf(clientConf);
-      if (userInfo != null) {
-        jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
-      }
       rJob = new JobClient(clientConf).submitJob(jc);
     }
     return rJob;</diff>
      <filename>src/test/org/apache/hadoop/mapred/TestQueueManager.java</filename>
    </modified>
    <modified>
      <diff>@@ -22,7 +22,11 @@ import java.text.DecimalFormat;
 import java.io.*;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.Enumeration;
+import java.util.Properties;
 
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.RandomWriter;
 import org.apache.hadoop.fs.Path;
@@ -49,6 +53,7 @@ import org.apache.hadoop.mapred.lib.IdentityReducer;
  */
 public class UtilsForTests {
 
+  static final Log LOG = LogFactory.getLog(UtilsForTests.class);
   final static long KB = 1024L * 1;
   final static long MB = 1024L * KB;
   final static long GB = 1024L * MB;
@@ -656,4 +661,19 @@ public class UtilsForTests {
       }
     }
   }
+
+  static void setUpConfigFile(Properties confProps, File configFile)
+    throws IOException {
+    Configuration config = new Configuration(false);
+    FileOutputStream fos = new FileOutputStream(configFile);
+
+    for (Enumeration&lt;?&gt; e = confProps.propertyNames(); e.hasMoreElements();) {
+      String key = (String) e.nextElement();
+      config.set(key, confProps.getProperty(key));
+    }
+
+    config.writeXml(fos);
+    fos.close();
+  }
 }
+</diff>
      <filename>src/test/org/apache/hadoop/mapred/UtilsForTests.java</filename>
    </modified>
  </modified>
  <removed type="array"/>
  <parents type="array">
    <parent>
      <id>508e588f309af125780024a4f9699d9304f70068</id>
    </parent>
  </parents>
  <author>
    <name>Lee Tucker</name>
    <email>ltucker@whatslowthis-lm.corp.yahoo.com</email>
  </author>
  <url>http://github.com/yahoo/hadoop/commit/cdcfadee1382b30b5a4e2ac7b4f4664ce2446fa8</url>
  <id>cdcfadee1382b30b5a4e2ac7b4f4664ce2446fa8</id>
  <committed-date>2009-06-08T15:04:18-07:00</committed-date>
  <authored-date>2009-06-08T15:04:18-07:00</authored-date>
  <message>Applying patch 2636770.5396.patch</message>
  <tree>8827fe676ccdf46ec779a813ed8575e0bed13662</tree>
  <committer>
    <name>Lee Tucker</name>
    <email>ltucker@whatslowthis-lm.corp.yahoo.com</email>
  </committer>
</commit>
