Skip to content

Commit

Permalink
HDFS-9251. Refactor TestWriteToReplica and TestFsDatasetImpl to avoid…
Browse files Browse the repository at this point in the history
… explicitly creating Files in the tests code. (lei)
  • Loading branch information
Lei Xu committed Oct 20, 2015
1 parent 9cb5d35 commit 71e533a
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 119 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -1540,6 +1540,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9250. Add Precondition check to LocatedBlock#addCachedLoc. HDFS-9250. Add Precondition check to LocatedBlock#addCachedLoc.
(Xiao Chen via wang) (Xiao Chen via wang)


HDFS-9251. Refactor TestWriteToReplica and TestFsDatasetImpl to avoid
explicitly creating Files in the tests code. (lei)

OPTIMIZATIONS OPTIMIZATIONS


HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;


import java.io.FileNotFoundException; import java.io.FileNotFoundException;
Expand Down Expand Up @@ -137,4 +138,58 @@ interface MaterializedReplica {
*/ */
MaterializedReplica getMaterializedReplica(ExtendedBlock block) MaterializedReplica getMaterializedReplica(ExtendedBlock block)
throws ReplicaNotFoundException; throws ReplicaNotFoundException;

/**
* Create a finalized replica and add it into the FsDataset.
*/
Replica createFinalizedReplica(ExtendedBlock block) throws IOException;

/**
* Create a finalized replica on a particular volume, and add it into
* the FsDataset.
*/
Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block)
throws IOException;

/**
* Create a {@link ReplicaInPipeline} and add it into the FsDataset.
*/
Replica createReplicaInPipeline(ExtendedBlock block) throws IOException;

/**
* Create a {@link ReplicaInPipeline} and add it into the FsDataset.
*/
Replica createReplicaInPipeline(FsVolumeSpi volume, ExtendedBlock block)
throws IOException;

/**
* Create a {@link ReplicaBeingWritten} and add it into the FsDataset.
*/
Replica createRBW(ExtendedBlock block) throws IOException;

/**
* Create a {@link ReplicaBeingWritten} on the particular volume, and add it
* into the FsDataset.
*/
Replica createRBW(FsVolumeSpi volume, ExtendedBlock block) throws IOException;

/**
* Create a {@link ReplicaWaitingToBeRecovered} object and add it into the
* FsDataset.
*/
Replica createReplicaWaitingToBeRecovered(ExtendedBlock block)
throws IOException;

/**
* Create a {@link ReplicaWaitingToBeRecovered} on the particular volume,
* and add it into the FsDataset.
*/
Replica createReplicaWaitingToBeRecovered(
FsVolumeSpi volume, ExtendedBlock block) throws IOException;

/**
* Create a {@link ReplicaUnderRecovery} object and add it into the FsDataset.
*/
Replica createReplicaUnderRecovery(ExtendedBlock block, long recoveryId)
throws IOException;
} }
Expand Up @@ -23,10 +23,20 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils; import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;


import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
Expand Down Expand Up @@ -176,4 +186,103 @@ public MaterializedReplica getMaterializedReplica(ExtendedBlock block)
blockFile, block.getGenerationStamp()); blockFile, block.getGenerationStamp());
return new FsDatasetImplMaterializedReplica(blockFile, metaFile); return new FsDatasetImplMaterializedReplica(blockFile, metaFile);
} }

@Override
public Replica createFinalizedReplica(ExtendedBlock block)
throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
return createFinalizedReplica(volumes.get(0), block);
}
}

@Override
public Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block)
throws IOException {
FsVolumeImpl vol = (FsVolumeImpl) volume;
ReplicaInfo info = new FinalizedReplica(block.getLocalBlock(), vol,
vol.getCurrentDir().getParentFile());
dataset.volumeMap.add(block.getBlockPoolId(), info);
info.getBlockFile().createNewFile();
info.getMetaFile().createNewFile();
return info;
}

@Override
public Replica createReplicaInPipeline(ExtendedBlock block)
throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
return createReplicaInPipeline(volumes.get(0), block);
}
}

@Override
public Replica createReplicaInPipeline(
FsVolumeSpi volume, ExtendedBlock block) throws IOException {
FsVolumeImpl vol = (FsVolumeImpl) volume;
ReplicaInPipeline rip = new ReplicaInPipeline(
block.getBlockId(), block.getGenerationStamp(), volume,
vol.createTmpFile(
block.getBlockPoolId(), block.getLocalBlock()).getParentFile(),
0);
dataset.volumeMap.add(block.getBlockPoolId(), rip);
return rip;
}


@Override
public Replica createRBW(ExtendedBlock eb) throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
return createRBW(volumes.get(0), eb);
}
}

@Override
public Replica createRBW(FsVolumeSpi volume, ExtendedBlock eb)
throws IOException {
FsVolumeImpl vol = (FsVolumeImpl) volume;
final String bpid = eb.getBlockPoolId();
final Block block = eb.getLocalBlock();
ReplicaBeingWritten rbw = new ReplicaBeingWritten(
eb.getLocalBlock(), volume,
vol.createRbwFile(bpid, block).getParentFile(), null);
rbw.getBlockFile().createNewFile();
rbw.getMetaFile().createNewFile();
dataset.volumeMap.add(bpid, rbw);
return rbw;
}

@Override
public Replica createReplicaWaitingToBeRecovered(ExtendedBlock eb)
throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
return createReplicaInPipeline(volumes.get(0), eb);
}
}

@Override
public Replica createReplicaWaitingToBeRecovered(
FsVolumeSpi volume, ExtendedBlock eb) throws IOException {
FsVolumeImpl vol = (FsVolumeImpl) volume;
final String bpid = eb.getBlockPoolId();
final Block block = eb.getLocalBlock();
ReplicaWaitingToBeRecovered rwbr =
new ReplicaWaitingToBeRecovered(eb.getLocalBlock(), volume,
vol.createRbwFile(bpid, block).getParentFile());
dataset.volumeMap.add(bpid, rwbr);
return rwbr;
}

@Override
public Replica createReplicaUnderRecovery(
ExtendedBlock block, long recoveryId) throws IOException {
try (FsVolumeReferences volumes = dataset.getFsVolumeReferences()) {
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
ReplicaUnderRecovery rur = new ReplicaUnderRecovery(new FinalizedReplica(
block.getLocalBlock(), volume, volume.getCurrentDir().getParentFile()),
recoveryId
);
dataset.volumeMap.add(block.getBlockPoolId(), rur);
return rur;
}
}
} }
Expand Up @@ -365,28 +365,26 @@ public void testAddVolumeFailureReleasesInUseLock() throws IOException {


@Test @Test
public void testDeletingBlocks() throws IOException { public void testDeletingBlocks() throws IOException {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build(); HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try { try {
cluster.waitActive(); cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);


FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
ds.addBlockPool(BLOCKPOOL, conf);
FsVolumeImpl vol; FsVolumeImpl vol;
try (FsDatasetSpi.FsVolumeReferences volumes = ds.getFsVolumeReferences()) { try (FsDatasetSpi.FsVolumeReferences volumes = ds.getFsVolumeReferences()) {
vol = (FsVolumeImpl)volumes.get(0); vol = (FsVolumeImpl)volumes.get(0);
} }


ExtendedBlock eb; ExtendedBlock eb;
ReplicaInfo info; ReplicaInfo info;
List<Block> blockList = new ArrayList<Block>(); List<Block> blockList = new ArrayList<>();
for (int i = 1; i <= 63; i++) { for (int i = 1; i <= 63; i++) {
eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i); eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i);
info = new FinalizedReplica( cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb);
eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); blockList.add(eb.getLocalBlock());
ds.volumeMap.add(BLOCKPOOL, info);
info.getBlockFile().createNewFile();
info.getMetaFile().createNewFile();
blockList.add(info);
} }
ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
try { try {
Expand All @@ -398,12 +396,8 @@ public void testDeletingBlocks() throws IOException {


blockList.clear(); blockList.clear();
eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064); eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064);
info = new FinalizedReplica( cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb);
eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); blockList.add(eb.getLocalBlock());
ds.volumeMap.add(BLOCKPOOL, info);
info.getBlockFile().createNewFile();
info.getMetaFile().createNewFile();
blockList.add(info);
ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
try { try {
Thread.sleep(1000); Thread.sleep(1000);
Expand Down

0 comments on commit 71e533a

Please sign in to comment.