Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-2943. Parameterize unit tests for chunk manager implementation #694

Merged
merged 2 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,45 @@
package org.apache.hadoop.ozone.container.common.impl;


import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.List;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_FIND_DATA_DIR;

/**
* Defines layout versions for the Chunks.
*/
public enum ChunkLayOutVersion {

FILE_PER_CHUNK(1, "One file per chunk"),
FILE_PER_BLOCK(2, "One file per block");
FILE_PER_CHUNK(1, "One file per chunk") {
@Override
public File getChunkFile(ContainerData containerData, BlockID blockID,
ChunkInfo info) throws StorageContainerException {
File chunksLoc = verifyChunkDirExists(containerData);
return chunksLoc.toPath().resolve(info.getChunkName()).toFile();
}
},
FILE_PER_BLOCK(2, "One file per block") {
@Override
public File getChunkFile(ContainerData containerData, BlockID blockID,
ChunkInfo info) throws StorageContainerException {
File chunkDir = verifyChunkDirExists(containerData);
return new File(chunkDir, blockID.getLocalID() + ".block");
}
};

private static final Logger LOG =
LoggerFactory.getLogger(ChunkLayOutVersion.class);

private static final ChunkLayOutVersion
DEFAULT_LAYOUT = ChunkLayOutVersion.FILE_PER_BLOCK;
Expand Down Expand Up @@ -91,8 +117,31 @@ public String getDescription() {
return description;
}

public abstract File getChunkFile(ContainerData containerData,
BlockID blockID, ChunkInfo info) throws StorageContainerException;

@Override
public String toString() {
return "ChunkLayout:v" + version;
}

private static File verifyChunkDirExists(ContainerData containerData)
throws StorageContainerException {
Preconditions.checkNotNull(containerData, "Container data can't be null");

String chunksPath = containerData.getChunksPath();
if (chunksPath == null) {
LOG.error("Chunks path is null in the container data");
throw new StorageContainerException("Unable to get Chunks directory.",
UNABLE_TO_FIND_DATA_DIR);
}
File chunksLoc = new File(chunksPath);
if (!chunksLoc.exists()) {
LOG.error("Chunks path does not exist");
throw new StorageContainerException("Unable to get Chunks directory.",
UNABLE_TO_FIND_DATA_DIR);
}
return chunksLoc;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public abstract class ContainerData {
// This can hold information like volume name, owner etc.,
private final Map<String, String> metadata;

// Path to Physical file system where chunks are stored.
private String chunksPath;

// State of the Container
private ContainerDataProto.State state;

Expand Down Expand Up @@ -227,6 +230,22 @@ public ChunkLayOutVersion getLayOutVersion() {
return ChunkLayOutVersion.getChunkLayOutVersion(layOutVersion);
}

/**
* Get chunks path.
* @return - Path where chunks are stored
*/
public String getChunksPath() {
return chunksPath;
}

/**
* Set chunks Path.
* @param chunkPath - File path.
*/
public void setChunksPath(String chunkPath) {
this.chunksPath = chunkPath;
}

/**
* Add/Update metadata.
* We should hold the container lock before updating the metadata as this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;

import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
Expand All @@ -57,7 +57,8 @@

public class KeyValueContainerCheck {

private static final Logger LOG = LoggerFactory.getLogger(Container.class);
private static final Logger LOG =
LoggerFactory.getLogger(KeyValueContainerCheck.class);

private long containerID;
private KeyValueContainerData onDiskContainerData; //loaded from fs/disk
Expand Down Expand Up @@ -213,10 +214,9 @@ private void scanData(DataTransferThrottler throttler, Canceler canceler)
*/
Preconditions.checkState(onDiskContainerData != null,
"invoke loadContainerData prior to calling this function");
File dbFile;
File metaDir = new File(metadataPath);

dbFile = KeyValueContainerLocationUtil
File metaDir = new File(metadataPath);
File dbFile = KeyValueContainerLocationUtil
.getContainerDBFile(metaDir, containerID);

if (!dbFile.exists() || !dbFile.canRead()) {
Expand All @@ -227,6 +227,9 @@ private void scanData(DataTransferThrottler throttler, Canceler canceler)
}

onDiskContainerData.setDbFile(dbFile);

ChunkLayOutVersion layout = onDiskContainerData.getLayOutVersion();

try(ReferenceCountedDB db =
BlockUtils.getDB(onDiskContainerData, checkConfig);
KeyValueBlockIterator kvIter = new KeyValueBlockIterator(containerID,
Expand All @@ -235,8 +238,9 @@ private void scanData(DataTransferThrottler throttler, Canceler canceler)
while(kvIter.hasNext()) {
BlockData block = kvIter.nextBlock();
for(ContainerProtos.ChunkInfo chunk : block.getChunks()) {
File chunkFile = ChunkUtils.getChunkFile(onDiskContainerData,
ChunkInfo.getFromProtoBuf(chunk));
File chunkFile = layout.getChunkFile(onDiskContainerData,
block.getBlockID(), ChunkInfo.getFromProtoBuf(chunk));

if (!chunkFile.exists()) {
// concurrent mutation in Block DB? lookup the block again.
byte[] bdata = db.getStore().get(
Expand All @@ -246,52 +250,77 @@ private void scanData(DataTransferThrottler throttler, Canceler canceler)
+ chunkFile.getAbsolutePath());
}
} else if (chunk.getChecksumData().getType()
!= ContainerProtos.ChecksumType.NONE){
int length = chunk.getChecksumData().getChecksumsList().size();
ChecksumData cData = new ChecksumData(
chunk.getChecksumData().getType(),
chunk.getChecksumData().getBytesPerChecksum(),
chunk.getChecksumData().getChecksumsList());
Checksum cal = new Checksum(cData.getChecksumType(),
cData.getBytesPerChecksum());
long bytesRead = 0;
byte[] buffer = new byte[cData.getBytesPerChecksum()];
try (InputStream fs = new FileInputStream(chunkFile)) {
for (int i = 0; i < length; i++) {
int v = fs.read(buffer);
if (v == -1) {
break;
}
bytesRead += v;
throttler.throttle(v, canceler);
ByteString expected = cData.getChecksums().get(i);
ByteString actual = cal.computeChecksum(buffer, 0, v)
.getChecksums().get(0);
if (!expected.equals(actual)) {
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s len=%d expected" +
" checksum %s actual checksum %s for block %s",
chunk.getChunkName(), chunk.getLen(),
Arrays.toString(expected.toByteArray()),
Arrays.toString(actual.toByteArray()),
block.getBlockID()));
}

}
if (bytesRead != chunk.getLen()) {
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s expected length=%d"
+ " actual length=%d for block %s",
chunk.getChunkName(),
chunk.getLen(), bytesRead, block.getBlockID()));
}
}
!= ContainerProtos.ChecksumType.NONE) {
verifyChecksum(block, chunk, chunkFile, layout, throttler,
canceler);
}
}
}
}
}

private static void verifyChecksum(BlockData block,
ContainerProtos.ChunkInfo chunk, File chunkFile,
ChunkLayOutVersion layout,
DataTransferThrottler throttler, Canceler canceler) throws IOException {
ChecksumData checksumData =
ChecksumData.getFromProtoBuf(chunk.getChecksumData());
int checksumCount = checksumData.getChecksums().size();
int bytesPerChecksum = checksumData.getBytesPerChecksum();
Checksum cal = new Checksum(checksumData.getChecksumType(),
bytesPerChecksum);
ByteBuffer buffer = ByteBuffer.allocate(bytesPerChecksum);
long bytesRead = 0;
try (FileChannel channel = FileChannel.open(chunkFile.toPath(),
ChunkUtils.READ_OPTIONS, ChunkUtils.NO_ATTRIBUTES)) {
if (layout == ChunkLayOutVersion.FILE_PER_BLOCK) {
channel.position(chunk.getOffset());
}
for (int i = 0; i < checksumCount; i++) {
// limit last read for FILE_PER_BLOCK, to avoid reading next chunk
if (layout == ChunkLayOutVersion.FILE_PER_BLOCK &&
i == checksumCount - 1 &&
chunk.getLen() % bytesPerChecksum != 0) {
buffer.limit((int) (chunk.getLen() % bytesPerChecksum));
}

int v = channel.read(buffer);
if (v == -1) {
break;
}
bytesRead += v;
buffer.flip();

throttler.throttle(v, canceler);

ByteString expected = checksumData.getChecksums().get(i);
ByteString actual = cal.computeChecksum(buffer)
.getChecksums().get(0);
if (!expected.equals(actual)) {
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s" +
" checksum item %d" +
" expected checksum %s" +
" actual checksum %s" +
" for block %s",
ChunkInfo.getFromProtoBuf(chunk),
i,
Arrays.toString(expected.toByteArray()),
Arrays.toString(actual.toByteArray()),
block.getBlockID()));
}

}
if (bytesRead != chunk.getLen()) {
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s expected length=%d"
+ " actual length=%d for block %s",
chunk.getChunkName(),
chunk.getLen(), bytesRead, block.getBlockID()));
}
}
}

private void loadContainerData() throws IOException {
File containerFile = KeyValueContainer
.getContainerFile(metadataPath, containerID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ public class KeyValueContainerData extends ContainerData {
// Path to Container metadata Level DB/RocksDB Store and .container file.
private String metadataPath;

// Path to Physical file system where chunks are stored.
private String chunksPath;

//Type of DB used to store key to chunks mapping
private String containerDBType;

Expand Down Expand Up @@ -164,22 +161,6 @@ public void updateBlockCommitSequenceId(long id) {
this.blockCommitSequenceId = id;
}

/**
* Get chunks path.
* @return - Path where chunks are stored
*/
public String getChunksPath() {
return chunksPath;
}

/**
* Set chunks Path.
* @param chunkPath - File path.
*/
public void setChunksPath(String chunkPath) {
this.chunksPath = chunkPath;
}

/**
* Returns the DBType used for the container.
* @return containerDBType
Expand Down