Skip to content

Commit

Permalink
HDFS-7645. Rolling upgrade is restoring blocks from trash multiple ti…
Browse files Browse the repository at this point in the history
…mes (Contributed by Vinayakumar B and Keisuke Ogiwara)
  • Loading branch information
arp7 committed Mar 30, 2015
1 parent cc0a01c commit 1a495fb
Show file tree
Hide file tree
Showing 16 changed files with 118 additions and 63 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -359,6 +359,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7890. Improve information on Top users for metrics in HDFS-7890. Improve information on Top users for metrics in
RollingWindowsManager and lower log level (J.Andreina via vinayakumarb) RollingWindowsManager and lower log level (J.Andreina via vinayakumarb)


HDFS-7645. Rolling upgrade is restoring blocks from trash multiple times.
(Vinayakumar B and Keisuke Ogiwara via Arpit Agarwal)

OPTIMIZATIONS OPTIMIZATIONS


BUG FIXES BUG FIXES
Expand Down
Expand Up @@ -29,12 +29,12 @@
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class RollingUpgradeInfo extends RollingUpgradeStatus { public class RollingUpgradeInfo extends RollingUpgradeStatus {
private final long startTime; private final long startTime;
private final long finalizeTime; private long finalizeTime;
private boolean createdRollbackImages; private boolean createdRollbackImages;


public RollingUpgradeInfo(String blockPoolId, boolean createdRollbackImages, public RollingUpgradeInfo(String blockPoolId, boolean createdRollbackImages,
long startTime, long finalizeTime) { long startTime, long finalizeTime) {
super(blockPoolId); super(blockPoolId, finalizeTime != 0);
this.createdRollbackImages = createdRollbackImages; this.createdRollbackImages = createdRollbackImages;
this.startTime = startTime; this.startTime = startTime;
this.finalizeTime = finalizeTime; this.finalizeTime = finalizeTime;
Expand All @@ -56,11 +56,23 @@ public boolean isStarted() {
public long getStartTime() { public long getStartTime() {
return startTime; return startTime;
} }


@Override
public boolean isFinalized() { public boolean isFinalized() {
return finalizeTime != 0; return finalizeTime != 0;
} }


/**
* Finalize the upgrade if not already finalized
* @param finalizeTime
*/
public void finalize(long finalizeTime) {
if (finalizeTime != 0) {
this.finalizeTime = finalizeTime;
createdRollbackImages = false;
}
}

public long getFinalizeTime() { public long getFinalizeTime() {
return finalizeTime; return finalizeTime;
} }
Expand Down
Expand Up @@ -27,15 +27,21 @@
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class RollingUpgradeStatus { public class RollingUpgradeStatus {
private final String blockPoolId; private final String blockPoolId;
private final boolean finalized;


public RollingUpgradeStatus(String blockPoolId) { public RollingUpgradeStatus(String blockPoolId, boolean finalized) {
this.blockPoolId = blockPoolId; this.blockPoolId = blockPoolId;
this.finalized = finalized;
} }


public String getBlockPoolId() { public String getBlockPoolId() {
return blockPoolId; return blockPoolId;
} }


public boolean isFinalized() {
return finalized;
}

@Override @Override
public int hashCode() { public int hashCode() {
return blockPoolId.hashCode(); return blockPoolId.hashCode();
Expand All @@ -48,8 +54,9 @@ public boolean equals(Object obj) {
} else if (obj == null || !(obj instanceof RollingUpgradeStatus)) { } else if (obj == null || !(obj instanceof RollingUpgradeStatus)) {
return false; return false;
} }
final RollingUpgradeStatus that = (RollingUpgradeStatus)obj; final RollingUpgradeStatus that = (RollingUpgradeStatus) obj;
return this.blockPoolId.equals(that.blockPoolId); return this.blockPoolId.equals(that.blockPoolId)
&& this.isFinalized() == that.isFinalized();
} }


@Override @Override
Expand Down
Expand Up @@ -1686,11 +1686,13 @@ public static RollingUpgradeStatusProto convertRollingUpgradeStatus(
RollingUpgradeStatus status) { RollingUpgradeStatus status) {
return RollingUpgradeStatusProto.newBuilder() return RollingUpgradeStatusProto.newBuilder()
.setBlockPoolId(status.getBlockPoolId()) .setBlockPoolId(status.getBlockPoolId())
.setFinalized(status.isFinalized())
.build(); .build();
} }


public static RollingUpgradeStatus convert(RollingUpgradeStatusProto proto) { public static RollingUpgradeStatus convert(RollingUpgradeStatusProto proto) {
return new RollingUpgradeStatus(proto.getBlockPoolId()); return new RollingUpgradeStatus(proto.getBlockPoolId(),
proto.getFinalized());
} }


public static RollingUpgradeInfoProto convert(RollingUpgradeInfo info) { public static RollingUpgradeInfoProto convert(RollingUpgradeInfo info) {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
Expand Down Expand Up @@ -470,15 +471,19 @@ List<BPServiceActor> getBPServiceActors() {


/** /**
* Signal the current rolling upgrade status as indicated by the NN. * Signal the current rolling upgrade status as indicated by the NN.
* @param inProgress true if a rolling upgrade is in progress * @param rollingUpgradeStatus rolling upgrade status
*/ */
void signalRollingUpgrade(boolean inProgress) throws IOException { void signalRollingUpgrade(RollingUpgradeStatus rollingUpgradeStatus)
throws IOException {
if (rollingUpgradeStatus == null) {
return;
}
String bpid = getBlockPoolId(); String bpid = getBlockPoolId();
if (inProgress) { if (!rollingUpgradeStatus.isFinalized()) {
dn.getFSDataset().enableTrash(bpid); dn.getFSDataset().enableTrash(bpid);
dn.getFSDataset().setRollingUpgradeMarker(bpid); dn.getFSDataset().setRollingUpgradeMarker(bpid);
} else { } else {
dn.getFSDataset().restoreTrash(bpid); dn.getFSDataset().clearTrash(bpid);
dn.getFSDataset().clearRollingUpgradeMarker(bpid); dn.getFSDataset().clearRollingUpgradeMarker(bpid);
} }
} }
Expand Down
Expand Up @@ -662,7 +662,7 @@ private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOExcepti
" in HeartbeatResponse. Expected " + " in HeartbeatResponse. Expected " +
bpos.getBlockPoolId()); bpos.getBlockPoolId());
} else { } else {
bpos.signalRollingUpgrade(rollingUpgradeStatus != null); bpos.signalRollingUpgrade(rollingUpgradeStatus);
} }
} }


Expand Down
Expand Up @@ -351,7 +351,8 @@ private void doTransition(DataNode datanode, StorageDirectory sd,
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " + sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
" both be present."); " both be present.");
doRollback(sd, nsInfo); // rollback if applicable doRollback(sd, nsInfo); // rollback if applicable
} else { } else if (startOpt == StartupOption.ROLLBACK &&
!sd.getPreviousDir().exists()) {
// Restore all the files in the trash. The restored files are retained // Restore all the files in the trash. The restored files are retained
// during rolling upgrade rollback. They are deleted during rolling // during rolling upgrade rollback. They are deleted during rolling
// upgrade downgrade. // upgrade downgrade.
Expand All @@ -378,6 +379,12 @@ private void doTransition(DataNode datanode, StorageDirectory sd,
&& this.cTime == nsInfo.getCTime()) { && this.cTime == nsInfo.getCTime()) {
return; // regular startup return; // regular startup
} }
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
LOG.info("Restored " + restored + " block files from trash " +
"before the layout upgrade. These blocks will be moved to " +
"the previous directory during the upgrade");
}
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) { || this.cTime < nsInfo.getCTime()) {
doUpgrade(datanode, sd, nsInfo); // upgrade doUpgrade(datanode, sd, nsInfo); // upgrade
Expand Down Expand Up @@ -730,16 +737,12 @@ String getRestoreDirectory(File blockFile) {
/** /**
* Delete all files and directories in the trash directories. * Delete all files and directories in the trash directories.
*/ */
public void restoreTrash() { public void clearTrash() {
for (StorageDirectory sd : storageDirs) { for (StorageDirectory sd : storageDirs) {
File trashRoot = getTrashRootDir(sd); File trashRoot = getTrashRootDir(sd);
try { Preconditions.checkState(!(trashRoot.exists() && sd.getPreviousDir().exists()));
Preconditions.checkState(!(trashRoot.exists() && sd.getPreviousDir().exists())); FileUtil.fullyDelete(trashRoot);
restoreBlockFilesFromTrash(trashRoot); LOG.info("Cleared trash for storage directory " + sd);
FileUtil.fullyDelete(getTrashRootDir(sd));
} catch (IOException ioe) {
LOG.warn("Restoring trash failed for storage directory " + sd);
}
} }
} }


Expand Down
Expand Up @@ -168,11 +168,11 @@ public void enableTrash(String bpid) {
} }
} }


public void restoreTrash(String bpid) { public void clearTrash(String bpid) {
if (trashEnabledBpids.contains(bpid)) { if (trashEnabledBpids.contains(bpid)) {
getBPStorage(bpid).restoreTrash(); getBPStorage(bpid).clearTrash();
trashEnabledBpids.remove(bpid); trashEnabledBpids.remove(bpid);
LOG.info("Restored trash for bpid " + bpid); LOG.info("Cleared trash for bpid " + bpid);
} }
} }


Expand Down
Expand Up @@ -490,9 +490,9 @@ public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid,
public void enableTrash(String bpid); public void enableTrash(String bpid);


/** /**
* Restore trash * Clear trash
*/ */
public void restoreTrash(String bpid); public void clearTrash(String bpid);


/** /**
* @return true when trash is enabled * @return true when trash is enabled
Expand Down
Expand Up @@ -2619,8 +2619,8 @@ public void enableTrash(String bpid) {
} }


@Override @Override
public void restoreTrash(String bpid) { public void clearTrash(String bpid) {
dataStorage.restoreTrash(bpid); dataStorage.clearTrash(bpid);
} }


@Override @Override
Expand Down
Expand Up @@ -7568,7 +7568,7 @@ public RollingUpgradeInfo.Bean getRollingUpgradeStatus() {


/** Is rolling upgrade in progress? */ /** Is rolling upgrade in progress? */
public boolean isRollingUpgrade() { public boolean isRollingUpgrade() {
return rollingUpgradeInfo != null; return rollingUpgradeInfo != null && !rollingUpgradeInfo.isFinalized();
} }


void checkRollingUpgrade(String action) throws RollingUpgradeException { void checkRollingUpgrade(String action) throws RollingUpgradeException {
Expand All @@ -7583,16 +7583,15 @@ RollingUpgradeInfo finalizeRollingUpgrade() throws IOException {
checkSuperuserPrivilege(); checkSuperuserPrivilege();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
writeLock(); writeLock();
final RollingUpgradeInfo returnInfo;
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (!isRollingUpgrade()) { if (!isRollingUpgrade()) {
return null; return null;
} }
checkNameNodeSafeMode("Failed to finalize rolling upgrade"); checkNameNodeSafeMode("Failed to finalize rolling upgrade");


returnInfo = finalizeRollingUpgradeInternal(now()); finalizeRollingUpgradeInternal(now());
getEditLog().logFinalizeRollingUpgrade(returnInfo.getFinalizeTime()); getEditLog().logFinalizeRollingUpgrade(rollingUpgradeInfo.getFinalizeTime());
if (haEnabled) { if (haEnabled) {
// roll the edit log to make sure the standby NameNode can tail // roll the edit log to make sure the standby NameNode can tail
getFSImage().rollEditLog(); getFSImage().rollEditLog();
Expand All @@ -7612,14 +7611,12 @@ RollingUpgradeInfo finalizeRollingUpgrade() throws IOException {
if (auditLog.isInfoEnabled() && isExternalInvocation()) { if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(true, "finalizeRollingUpgrade", null, null, null); logAuditEvent(true, "finalizeRollingUpgrade", null, null, null);
} }
return returnInfo; return rollingUpgradeInfo;
} }


RollingUpgradeInfo finalizeRollingUpgradeInternal(long finalizeTime) void finalizeRollingUpgradeInternal(long finalizeTime) {
throws RollingUpgradeException { // Set the finalize time
final long startTime = rollingUpgradeInfo.getStartTime(); rollingUpgradeInfo.finalize(finalizeTime);
rollingUpgradeInfo = null;
return new RollingUpgradeInfo(blockPoolId, false, startTime, finalizeTime);
} }


long addCacheDirective(CacheDirectiveInfo directive, long addCacheDirective(CacheDirectiveInfo directive,
Expand Down
1 change: 1 addition & 0 deletions hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
Expand Up @@ -607,4 +607,5 @@ message SnapshotInfoProto {
*/ */
message RollingUpgradeStatusProto { message RollingUpgradeStatusProto {
required string blockPoolId = 1; required string blockPoolId = 1;
optional bool finalized = 2 [default = false];
} }
Expand Up @@ -79,13 +79,17 @@
<button type="button" class="close" data-dismiss="alert" aria-hidden="true">&times;</button> <button type="button" class="close" data-dismiss="alert" aria-hidden="true">&times;</button>


{#RollingUpgradeStatus} {#RollingUpgradeStatus}
{@if cond="{finalizeTime} > 0"}
<p>Rolling upgrade finalized at {#helper_date_tostring value="{finalizeTime}"/}. </p>
{:else}
<p>Rolling upgrade started at {#helper_date_tostring value="{startTime}"/}. </br> <p>Rolling upgrade started at {#helper_date_tostring value="{startTime}"/}. </br>
{#createdRollbackImages} {#createdRollbackImages}
Rollback image has been created. Proceed to upgrade daemons. Rollback image has been created. Proceed to upgrade daemons.
{:else} {:else}
Rollback image has not been created. Rollback image has not been created.
{/createdRollbackImages} {/createdRollbackImages}
</p> </p>
{/if}
{/RollingUpgradeStatus} {/RollingUpgradeStatus}


{@if cond="{DistinctVersionCount} > 1"} {@if cond="{DistinctVersionCount} > 1"}
Expand Down
Expand Up @@ -1226,7 +1226,7 @@ public void enableTrash(String bpid) {
} }


@Override @Override
public void restoreTrash(String bpid) { public void clearTrash(String bpid) {
} }


@Override @Override
Expand Down
Expand Up @@ -19,12 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;


import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;


import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
Expand All @@ -43,7 +38,9 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.Builder; import org.apache.hadoop.hdfs.MiniDFSCluster.Builder;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.TestRollingUpgrade; import org.apache.hadoop.hdfs.TestRollingUpgrade;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
Expand Down Expand Up @@ -208,29 +205,53 @@ private void rollbackRollingUpgrade() throws Exception {
public void testDatanodeRollingUpgradeWithFinalize() throws Exception { public void testDatanodeRollingUpgradeWithFinalize() throws Exception {
try { try {
startCluster(); startCluster();
rollingUpgradeAndFinalize();
// Do it again
rollingUpgradeAndFinalize();
} finally {
shutdownCluster();
}
}


// Create files in DFS. @Test(timeout = 600000)
Path testFile1 = new Path("/" + GenericTestUtils.getMethodName() + ".01.dat"); public void testDatanodeRUwithRegularUpgrade() throws Exception {
Path testFile2 = new Path("/" + GenericTestUtils.getMethodName() + ".02.dat"); try {
DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED); startCluster();
DFSTestUtil.createFile(fs, testFile2, FILE_SIZE, REPL_FACTOR, SEED); rollingUpgradeAndFinalize();

DataNodeProperties dn = cluster.stopDataNode(0);
startRollingUpgrade(); cluster.restartNameNode(0, true, "-upgrade");
File blockFile = getBlockForFile(testFile2, true); cluster.restartDataNode(dn, true);
File trashFile = getTrashFileForBlock(blockFile, false); cluster.waitActive();
deleteAndEnsureInTrash(testFile2, blockFile, trashFile); fs = cluster.getFileSystem(0);
finalizeRollingUpgrade(); Path testFile3 = new Path("/" + GenericTestUtils.getMethodName()

+ ".03.dat");
// Ensure that delete file testFile2 stays deleted after finalize DFSTestUtil.createFile(fs, testFile3, FILE_SIZE, REPL_FACTOR, SEED);
assertFalse(isTrashRootPresent()); cluster.getFileSystem().finalizeUpgrade();
assert(!fs.exists(testFile2));
assert(fs.exists(testFile1));

} finally { } finally {
shutdownCluster(); shutdownCluster();
} }
} }


private void rollingUpgradeAndFinalize() throws IOException, Exception {
// Create files in DFS.
Path testFile1 = new Path("/" + GenericTestUtils.getMethodName() + ".01.dat");
Path testFile2 = new Path("/" + GenericTestUtils.getMethodName() + ".02.dat");
DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED);
DFSTestUtil.createFile(fs, testFile2, FILE_SIZE, REPL_FACTOR, SEED);

startRollingUpgrade();
File blockFile = getBlockForFile(testFile2, true);
File trashFile = getTrashFileForBlock(blockFile, false);
cluster.triggerBlockReports();
deleteAndEnsureInTrash(testFile2, blockFile, trashFile);
finalizeRollingUpgrade();

// Ensure that delete file testFile2 stays deleted after finalize
assertFalse(isTrashRootPresent());
assert(!fs.exists(testFile2));
assert(fs.exists(testFile1));
}

@Test (timeout=600000) @Test (timeout=600000)
public void testDatanodeRollingUpgradeWithRollback() throws Exception { public void testDatanodeRollingUpgradeWithRollback() throws Exception {
try { try {
Expand Down

0 comments on commit 1a495fb

Please sign in to comment.