Skip to content

Commit

Permalink
HDFS-6955. DN should reserve disk space for a full block when creatin…
Browse files Browse the repository at this point in the history
…g tmp files (Contributed by Kanaka Kumar Avvaru)
  • Loading branch information
vinayakumarb committed Sep 18, 2015
1 parent a7201d6 commit 92c1af1
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 56 deletions.
Expand Up @@ -117,7 +117,7 @@ class BlockReceiver implements Closeable {
/** the block to receive */ /** the block to receive */
private final ExtendedBlock block; private final ExtendedBlock block;
/** the replica to write */ /** the replica to write */
private final ReplicaInPipelineInterface replicaInfo; private ReplicaInPipelineInterface replicaInfo;
/** pipeline stage */ /** pipeline stage */
private final BlockConstructionStage stage; private final BlockConstructionStage stage;
private final boolean isTransfer; private final boolean isTransfer;
Expand Down Expand Up @@ -259,6 +259,9 @@ class BlockReceiver implements Closeable {
} catch (ReplicaNotFoundException bne) { } catch (ReplicaNotFoundException bne) {
throw bne; throw bne;
} catch(IOException ioe) { } catch(IOException ioe) {
if (replicaInfo != null) {
replicaInfo.releaseAllBytesReserved();
}
IOUtils.closeStream(this); IOUtils.closeStream(this);
cleanupBlock(); cleanupBlock();


Expand Down
Expand Up @@ -62,13 +62,13 @@ public interface FsVolumeSpi {
boolean isTransientStorage(); boolean isTransientStorage();


/** /**
* Reserve disk space for an RBW block so a writer does not run out of * Reserve disk space for a block (RBW or Re-replicating)
* space before the block is full. * so a writer does not run out of space before the block is full.
*/ */
void reserveSpaceForRbw(long bytesToReserve); void reserveSpaceForReplica(long bytesToReserve);


/** /**
* Release disk space previously reserved for RBW block. * Release disk space previously reserved for block opened for write.
*/ */
void releaseReservedSpace(long bytesToRelease); void releaseReservedSpace(long bytesToRelease);


Expand Down
Expand Up @@ -1157,7 +1157,7 @@ private synchronized ReplicaBeingWritten append(String bpid,


// Replace finalized replica by a RBW replica in replicas map // Replace finalized replica by a RBW replica in replicas map
volumeMap.add(bpid, newReplicaInfo); volumeMap.add(bpid, newReplicaInfo);
v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes()); v.reserveSpaceForReplica(estimateBlockLen - replicaInfo.getNumBytes());
return newReplicaInfo; return newReplicaInfo;
} }


Expand Down Expand Up @@ -1487,7 +1487,7 @@ public ReplicaHandler createTemporary(
} }
ReplicaInPipeline newReplicaInfo = ReplicaInPipeline newReplicaInfo =
new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
f.getParentFile(), 0); f.getParentFile(), b.getLocalBlock().getNumBytes());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo); volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return new ReplicaHandler(newReplicaInfo, ref); return new ReplicaHandler(newReplicaInfo, ref);
} else { } else {
Expand Down Expand Up @@ -1604,7 +1604,7 @@ public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
// remove from volumeMap // remove from volumeMap
volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());

// delete the on-disk temp file // delete the on-disk temp file
if (delBlockFromDisk(replicaInfo.getBlockFile(), if (delBlockFromDisk(replicaInfo.getBlockFile(),
replicaInfo.getMetaFile(), b.getLocalBlock())) { replicaInfo.getMetaFile(), b.getLocalBlock())) {
Expand Down Expand Up @@ -2555,14 +2555,15 @@ private static class VolumeInfo {
final long usedSpace; // size of space used by HDFS final long usedSpace; // size of space used by HDFS
final long freeSpace; // size of free space excluding reserved space final long freeSpace; // size of free space excluding reserved space
final long reservedSpace; // size of space reserved for non-HDFS final long reservedSpace; // size of space reserved for non-HDFS
final long reservedSpaceForRBW; // size of space reserved RBW final long reservedSpaceForReplicas; // size of space reserved RBW or
// re-replication


VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) { VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) {
this.directory = v.toString(); this.directory = v.toString();
this.usedSpace = usedSpace; this.usedSpace = usedSpace;
this.freeSpace = freeSpace; this.freeSpace = freeSpace;
this.reservedSpace = v.getReserved(); this.reservedSpace = v.getReserved();
this.reservedSpaceForRBW = v.getReservedForRbw(); this.reservedSpaceForReplicas = v.getReservedForReplicas();
} }
} }


Expand Down Expand Up @@ -2596,7 +2597,7 @@ public Map<String, Object> getVolumeInfoMap() {
innerInfo.put("usedSpace", v.usedSpace); innerInfo.put("usedSpace", v.usedSpace);
innerInfo.put("freeSpace", v.freeSpace); innerInfo.put("freeSpace", v.freeSpace);
innerInfo.put("reservedSpace", v.reservedSpace); innerInfo.put("reservedSpace", v.reservedSpace);
innerInfo.put("reservedSpaceForRBW", v.reservedSpaceForRBW); innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas);
info.put(v.directory, innerInfo); info.put(v.directory, innerInfo);
} }
return info; return info;
Expand Down
Expand Up @@ -22,8 +22,8 @@
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.nio.channels.ClosedChannelException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
Expand All @@ -40,9 +40,6 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;


import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.DF;
Expand All @@ -54,21 +51,24 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/** /**
* The underlying volume used to store replica. * The underlying volume used to store replica.
* *
Expand All @@ -90,8 +90,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
private final long reserved; private final long reserved;
private CloseableReferenceCount reference = new CloseableReferenceCount(); private CloseableReferenceCount reference = new CloseableReferenceCount();


// Disk space reserved for open blocks. // Disk space reserved for blocks (RBW or Re-replicating) open for write.
private AtomicLong reservedForRbw; private AtomicLong reservedForReplicas;
private long recentReserved = 0;


// Capacity configured. This is useful when we want to // Capacity configured. This is useful when we want to
// limit the visible capacity for tests. If negative, then we just // limit the visible capacity for tests. If negative, then we just
Expand All @@ -113,8 +114,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
this.reserved = conf.getLong( this.reserved = conf.getLong(
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
this.reservedForRbw = new AtomicLong(0L); this.reservedForReplicas = new AtomicLong(0L);
this.currentDir = currentDir; this.currentDir = currentDir;
File parent = currentDir.getParentFile(); File parent = currentDir.getParentFile();
this.usage = new DF(parent, conf); this.usage = new DF(parent, conf);
this.storageType = storageType; this.storageType = storageType;
Expand Down Expand Up @@ -353,19 +354,25 @@ public void setCapacityForTesting(long capacity) {
*/ */
@Override @Override
public long getAvailable() throws IOException { public long getAvailable() throws IOException {
long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get(); long remaining = getCapacity() - getDfsUsed() - reservedForReplicas.get();
long available = usage.getAvailable() - reserved - reservedForRbw.get(); long available = usage.getAvailable() - reserved
- reservedForReplicas.get();
if (remaining > available) { if (remaining > available) {
remaining = available; remaining = available;
} }
return (remaining > 0) ? remaining : 0; return (remaining > 0) ? remaining : 0;
} }


@VisibleForTesting @VisibleForTesting
public long getReservedForRbw() { public long getReservedForReplicas() {
return reservedForRbw.get(); return reservedForReplicas.get();
} }


@VisibleForTesting
long getRecentReserved() {
return recentReserved;
}

long getReserved(){ long getReserved(){
return reserved; return reserved;
} }
Expand Down Expand Up @@ -412,13 +419,20 @@ public String[] getBlockPoolList() {
*/ */
File createTmpFile(String bpid, Block b) throws IOException { File createTmpFile(String bpid, Block b) throws IOException {
checkReference(); checkReference();
return getBlockPoolSlice(bpid).createTmpFile(b); reserveSpaceForReplica(b.getNumBytes());
try {
return getBlockPoolSlice(bpid).createTmpFile(b);
} catch (IOException exception) {
releaseReservedSpace(b.getNumBytes());
throw exception;
}
} }


@Override @Override
public void reserveSpaceForRbw(long bytesToReserve) { public void reserveSpaceForReplica(long bytesToReserve) {
if (bytesToReserve != 0) { if (bytesToReserve != 0) {
reservedForRbw.addAndGet(bytesToReserve); reservedForReplicas.addAndGet(bytesToReserve);
recentReserved = bytesToReserve;
} }
} }


Expand All @@ -428,14 +442,15 @@ public void releaseReservedSpace(long bytesToRelease) {


long oldReservation, newReservation; long oldReservation, newReservation;
do { do {
oldReservation = reservedForRbw.get(); oldReservation = reservedForReplicas.get();
newReservation = oldReservation - bytesToRelease; newReservation = oldReservation - bytesToRelease;
if (newReservation < 0) { if (newReservation < 0) {
// Failsafe, this should never occur in practice, but if it does we don't // Failsafe, this should never occur in practice, but if it does we
// want to start advertising more space than we have available. // don't want to start advertising more space than we have available.
newReservation = 0; newReservation = 0;
} }
} while (!reservedForRbw.compareAndSet(oldReservation, newReservation)); } while (!reservedForReplicas.compareAndSet(oldReservation,
newReservation));
} }
} }


Expand Down Expand Up @@ -779,7 +794,7 @@ public FsDatasetSpi getDataset() {
*/ */
File createRbwFile(String bpid, Block b) throws IOException { File createRbwFile(String bpid, Block b) throws IOException {
checkReference(); checkReference();
reserveSpaceForRbw(b.getNumBytes()); reserveSpaceForReplica(b.getNumBytes());
try { try {
return getBlockPoolSlice(bpid).createRbwFile(b); return getBlockPoolSlice(bpid).createRbwFile(b);
} catch (IOException exception) { } catch (IOException exception) {
Expand All @@ -790,16 +805,15 @@ File createRbwFile(String bpid, Block b) throws IOException {


/** /**
* *
* @param bytesReservedForRbw Space that was reserved during * @param bytesReserved Space that was reserved during
* block creation. Now that the block is being finalized we * block creation. Now that the block is being finalized we
* can free up this space. * can free up this space.
* @return * @return
* @throws IOException * @throws IOException
*/ */
File addFinalizedBlock(String bpid, Block b, File addFinalizedBlock(String bpid, Block b, File f, long bytesReserved)
File f, long bytesReservedForRbw)
throws IOException { throws IOException {
releaseReservedSpace(bytesReservedForRbw); releaseReservedSpace(bytesReserved);
return getBlockPoolSlice(bpid).addFinalizedBlock(b, f); return getBlockPoolSlice(bpid).addFinalizedBlock(b, f);
} }


Expand Down
Expand Up @@ -492,7 +492,7 @@ public boolean isTransientStorage() {
} }


@Override @Override
public void reserveSpaceForRbw(long bytesToReserve) { public void reserveSpaceForReplica(long bytesToReserve) {
} }


@Override @Override
Expand Down
Expand Up @@ -612,7 +612,7 @@ public boolean isTransientStorage() {
} }


@Override @Override
public void reserveSpaceForRbw(long bytesToReserve) { public void reserveSpaceForReplica(long bytesToReserve) {
} }


@Override @Override
Expand Down
Expand Up @@ -74,7 +74,7 @@ public boolean isTransientStorage() {
} }


@Override @Override
public void reserveSpaceForRbw(long bytesToReserve) { public void reserveSpaceForReplica(long bytesToReserve) {
} }


@Override @Override
Expand Down

0 comments on commit 92c1af1

Please sign in to comment.