diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index c14526b6a11b3..b2a0e0514b4c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1576,7 +1576,8 @@ void startStandbyServices(final Configuration conf, boolean isObserver) // Disable quota checks while in standby. dir.disableQuotaChecks(); - editLogTailer = new EditLogTailer(this, conf); + boolean shouldTriggerActiveLogRoll = isObserver ? false : true; + editLogTailer = new EditLogTailer(this, conf , shouldTriggerActiveLogRoll); editLogTailer.start(); if (!isObserver && standbyShouldCheckpoint) { standbyCheckpointer = new StandbyCheckpointer(conf, this); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index d14d8a8892202..b78776fa0036d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -178,8 +178,9 @@ public class EditLogTailer { */ private Timer timer; - public EditLogTailer(FSNamesystem namesystem, Configuration conf) { - this.tailerThread = new EditLogTailerThread(); + public EditLogTailer(FSNamesystem namesystem, Configuration conf, + boolean shouldTriggerActiveLogRoll) { + this.tailerThread = new EditLogTailerThread(shouldTriggerActiveLogRoll); this.conf = conf; this.namesystem = namesystem; this.timer = new Timer(); @@ -476,14 +477,21 @@ void sleep(long sleepTimeMillis) throws InterruptedException { */ private class EditLogTailerThread extends Thread { private volatile boolean shouldRun = true; + private final boolean shouldTriggerActiveLogRoll; - private EditLogTailerThread() { + private EditLogTailerThread(boolean shouldTriggerActiveLogRoll) { super("Edit log tailer"); + this.shouldTriggerActiveLogRoll = shouldTriggerActiveLogRoll; } private void setShouldRun(boolean shouldRun) { this.shouldRun = shouldRun; } + + @VisibleForTesting + public long getLastRollTimeMs(){ + return lastRollTimeMs; + } @Override public void run() { @@ -507,7 +515,8 @@ private void doWork() { // triggered. boolean triggeredLogRoll = false; if (tooLongSinceLastLoad() && - lastRollTriggerTxId < lastLoadedTxnId) { + lastRollTriggerTxId < lastLoadedTxnId && + shouldTriggerActiveLogRoll) { triggerActiveLogRoll(); triggeredLogRoll = true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java index 168273117b50f..c87812ab1c0d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java @@ -203,7 +203,7 @@ public void testDelegationTokenDFSApi() throws Exception { private class EditLogTailerForTest extends EditLogTailer { public EditLogTailerForTest(FSNamesystem namesystem, Configuration conf) { - super(namesystem, conf); + super(namesystem, conf, true); } public void catchupDuringFailover() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index 4e88cd389ea3d..f32c6fac0c277 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -177,7 +177,7 @@ public void testTailerBackoff() throws Exception { final Queue sleepDurations = new ConcurrentLinkedQueue<>(); final int zeroEditCount = 5; final AtomicInteger tailEditsCallCount = new AtomicInteger(0); - EditLogTailer tailer = new EditLogTailer(mockNamesystem, conf) { + EditLogTailer tailer = new EditLogTailer(mockNamesystem, conf, true) { @Override void sleep(long sleepTimeMs) { if (sleepDurations.size() <= zeroEditCount) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index a293cb4d17c47..2ce27eec9d913 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -164,7 +164,7 @@ public void testObserverRequeue() throws Exception { FileStatus fileStatus = scheduledFuture.get(10000, TimeUnit.MILLISECONDS); assertNotNull(fileStatus); } finally { - EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf); + EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf, false); observerFsNS.setEditLogTailerForTests(editLogTailer); editLogTailer.start(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyRollEditsLogOnly.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyRollEditsLogOnly.java new file mode 100644 index 0000000000000..b33704579deda --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyRollEditsLogOnly.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; +import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.OBSERVER_PROBE_RETRY_PERIOD_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestStandbyRollEditsLogOnly { + private static Configuration conf; + private static MiniQJMHACluster qjmhaCluster; + private static MiniDFSCluster dfsCluster; + @BeforeClass + public static void startUpCluster() throws Exception { + conf = new Configuration(); + conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true); + conf.set(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, "5"); + conf.setTimeDuration( + OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS); + qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 1, true); + dfsCluster = qjmhaCluster.getDfsCluster(); + } + + @Test(timeout=60000) + public void testOnlyStandbyRollEditlog() throws IOException, + InterruptedException { + dfsCluster.transitionToActive(0); + dfsCluster.transitionToStandby(1); + dfsCluster.transitionToObserver(2); + dfsCluster.waitActive(); + NameNode standbyNameNode = dfsCluster.getNameNode(1); + NameNode observerNameNode = dfsCluster.getNameNode(2); + Assert.assertEquals("transitionToStandby failed !", + "standby",standbyNameNode.getNamesystem().getHAState() ); + Assert.assertEquals("transitionToObserver failed !", + "observer",observerNameNode.getNamesystem().getHAState()); + + long standbyInitialRollTime = + standbyNameNode.getNamesystem().getEditLogTailer().getLastRollTimeMs(); + long observerInitialRollTime = + observerNameNode.getNamesystem().getEditLogTailer().getLastRollTimeMs(); + //wait for roll edits log + Thread.sleep(6000); + long standbyLastRollTime = + standbyNameNode.getNamesystem().getEditLogTailer().getLastRollTimeMs(); + long observerLastRollTime = + observerNameNode.getNamesystem().getEditLogTailer().getLastRollTimeMs(); + assertTrue("Standby should roll the log!", + standbyLastRollTime > standbyInitialRollTime); + assertEquals("The observer is not expected to roll the log.", + observerInitialRollTime , observerLastRollTime); + } + + @Test(timeout=60000) + public void testTransObToStandbyThenRollLog() throws IOException, + InterruptedException { + + int standbyNameNodeIndex = getStandbyNameNode(); + int observerNameNodeIndex = getObserverNameNode(); + assert standbyNameNodeIndex > 0; + assert observerNameNodeIndex > 0; + dfsCluster.transitionToObserver(standbyNameNodeIndex); + dfsCluster.transitionToStandby(observerNameNodeIndex); + NameNode standbyNameNode = dfsCluster.getNameNode(observerNameNodeIndex); + NameNode observerNameNode = dfsCluster.getNameNode(standbyNameNodeIndex); + Assert.assertEquals("transitionToStandby failed !", + "standby",standbyNameNode.getNamesystem().getHAState() ); + Assert.assertEquals("transitionToObserver failed !", + "observer",observerNameNode.getNamesystem().getHAState()); + + long standbyInitialRollTime = + standbyNameNode.getNamesystem().getEditLogTailer().getLastRollTimeMs(); + long observerInitialRollTime = + observerNameNode.getNamesystem().getEditLogTailer().getLastRollTimeMs(); + //wait for roll edits log + Thread.sleep(6000); + long standbyLastRollTime = + standbyNameNode.getNamesystem().getEditLogTailer().getLastRollTimeMs(); + long observerLastRollTime = + observerNameNode.getNamesystem().getEditLogTailer().getLastRollTimeMs(); + assertTrue("Standby should roll the log", + standbyLastRollTime > standbyInitialRollTime); + Assert.assertEquals("The observer is not expected to roll the log.", + observerInitialRollTime , observerLastRollTime); + } + + private int getObserverNameNode(){ + for (int i = 0; i < dfsCluster.getNumNameNodes(); i++) { + if(dfsCluster.getNameNode(i).isObserverState()){ + return i; + } + } + return -1; + } + + private int getStandbyNameNode(){ + for (int i = 0; i < dfsCluster.getNumNameNodes(); i++) { + if(dfsCluster.getNameNode(i).isStandbyState()){ + return i; + } + } + return -1; + } + + @AfterClass + public static void shutDownCluster() throws IOException { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + } +}