diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 748d0bfc8cd..3578bd6ea1e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import com.google.common.annotations.VisibleForTesting; @@ -192,13 +191,10 @@ protected List getChunkInfos() throws IOException { blockID.getContainerID()); } - if (token != null) { - UserGroupInformation.getCurrentUser().addToken(token); - } DatanodeBlockID datanodeBlockID = blockID .getDatanodeBlockIDProtobuf(); GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClient, datanodeBlockID); + .getBlock(xceiverClient, datanodeBlockID, token); chunks = response.getBlockData().getChunksList(); success = true; @@ -218,7 +214,7 @@ protected List getChunkInfos() throws IOException { */ protected synchronized void addStream(ChunkInfo chunkInfo) { chunkStreams.add(new ChunkInputStream(chunkInfo, blockID, - xceiverClient, verifyChecksum)); + xceiverClient, verifyChecksum, token)); } public synchronized long getRemaining() throws IOException { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 92071754c33..e048708107e 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -51,6 +51,9 @@ import com.google.common.base.Preconditions; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync; + +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,6 +127,7 @@ public class BlockOutputStream extends OutputStream { private int currentBufferRemaining; //current buffer allocated to write private ChunkBuffer currentBuffer; + private final Token token; /** * Creates a new BlockOutputStream. @@ -136,6 +140,7 @@ public class BlockOutputStream extends OutputStream { * @param streamBufferMaxSize max size of the currentBuffer * @param checksumType checksum type * @param bytesPerChecksum Bytes per checksum + * @param token a token for this block (may be null) */ @SuppressWarnings("parameternumber") public BlockOutputStream(BlockID blockID, @@ -143,7 +148,8 @@ public BlockOutputStream(BlockID blockID, int streamBufferSize, long streamBufferFlushSize, boolean streamBufferFlushDelay, long streamBufferMaxSize, BufferPool bufferPool, ChecksumType checksumType, - int bytesPerChecksum) throws IOException { + int bytesPerChecksum, Token token) + throws IOException { this.blockID = new AtomicReference<>(blockID); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); @@ -158,6 +164,7 @@ public BlockOutputStream(BlockID blockID, this.streamBufferFlushDelay = streamBufferFlushDelay; this.bufferPool = bufferPool; this.bytesPerChecksum = bytesPerChecksum; + this.token = token; //number of buffers used before doing a flush refreshCurrentBuffer(bufferPool); @@ -425,7 +432,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close, try { BlockData blockData = containerBlockData.build(); XceiverClientReply asyncReply = - putBlockAsync(xceiverClient, blockData, close); + putBlockAsync(xceiverClient, blockData, close, token); CompletableFuture future = asyncReply.getResponse(); flushFuture = future.thenApplyAsync(e -> { @@ -663,8 +670,8 @@ private void writeChunkToContainer(ChunkBuffer chunk) throws IOException { } try { - XceiverClientReply asyncReply = - writeChunkAsync(xceiverClient, chunkInfo, blockID.get(), data); + XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo, + blockID.get(), data, token); CompletableFuture future = asyncReply.getResponse(); future.thenApplyAsync(e -> { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index b1f7126ff83..cfb3a21f62b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -31,6 +31,8 @@ import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import java.io.EOFException; @@ -72,15 +74,19 @@ public class ChunkInputStream extends InputStream implements Seekable { // position. Once the chunk is read, this variable is reset. private long chunkPosition = -1; + private final Token token; + private static final int EOF = -1; ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId, - XceiverClientSpi xceiverClient, boolean verifyChecksum) { + XceiverClientSpi xceiverClient, boolean verifyChecksum, + Token token) { this.chunkInfo = chunkInfo; this.length = chunkInfo.getLen(); this.blockID = blockId; this.xceiverClient = xceiverClient; this.verifyChecksum = verifyChecksum; + this.token = token; } public synchronized long getRemaining() throws IOException { @@ -332,7 +338,7 @@ protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException { validators.add(validator); readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, - readChunkInfo, blockID, validators); + readChunkInfo, blockID, validators, token); } catch (IOException e) { if (e instanceof StorageContainerException) { diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java index 8405f4349be..e654d119f75 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java @@ -42,7 +42,7 @@ public DummyChunkInputStream(TestChunkInputStream testChunkInputStream, XceiverClientSpi xceiverClient, boolean verifyChecksum, byte[] data) { - super(chunkInfo, blockId, xceiverClient, verifyChecksum); + super(chunkInfo, blockId, xceiverClient, verifyChecksum, null); this.chunkData = data; } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index 141a1d81e83..545c5889bce 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -107,7 +107,7 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) 32 * 1024 * 1024, bufferPool, ChecksumType.NONE, - 256 * 1024); + 256 * 1024, null); return outputStream; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index ea5fc825ca8..eecc9759509 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -54,13 +54,11 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; -import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector; -import org.apache.hadoop.io.Text; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; /** @@ -80,11 +78,13 @@ private ContainerProtocolCalls() { * * @param xceiverClient client to perform call * @param datanodeBlockID blockID to identify container + * @param token a token for this block (may be null) * @return container protocol get block response * @throws IOException if there is an I/O error while performing the call */ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, - DatanodeBlockID datanodeBlockID) throws IOException { + DatanodeBlockID datanodeBlockID, + Token token) throws IOException { GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto .newBuilder() .setBlockID(datanodeBlockID); @@ -96,9 +96,8 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, .setContainerID(datanodeBlockID.getContainerID()) .setDatanodeUuid(id) .setGetBlock(readBlockRequest); - String encodedToken = getEncodedBlockToken(getService(datanodeBlockID)); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); @@ -116,12 +115,14 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, * * @param xceiverClient client to perform call * @param blockID blockId for the Block + * @param token a token for this block (may be null) * @return container protocol getLastCommittedBlockLength response * @throws IOException if there is an I/O error while performing the call */ public static ContainerProtos.GetCommittedBlockLengthResponseProto getCommittedBlockLength( - XceiverClientSpi xceiverClient, BlockID blockID) + XceiverClientSpi xceiverClient, BlockID blockID, + Token token) throws IOException { ContainerProtos.GetCommittedBlockLengthRequestProto.Builder getBlockLengthRequestBuilder = @@ -134,10 +135,8 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id) .setGetCommittedBlockLength(getBlockLengthRequestBuilder); - String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = @@ -150,11 +149,13 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, * * @param xceiverClient client to perform call * @param containerBlockData block data to identify container + * @param token a token for this block (may be null) * @return putBlockResponse * @throws IOException if there is an I/O error while performing the call */ public static ContainerProtos.PutBlockResponseProto putBlock( - XceiverClientSpi xceiverClient, BlockData containerBlockData) + XceiverClientSpi xceiverClient, BlockData containerBlockData, + Token token) throws IOException { PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto.newBuilder().setBlockData(containerBlockData); @@ -164,10 +165,8 @@ public static ContainerProtos.PutBlockResponseProto putBlock( .setContainerID(containerBlockData.getBlockID().getContainerID()) .setDatanodeUuid(id) .setPutBlock(createBlockRequest); - String encodedToken = - getEncodedBlockToken(getService(containerBlockData.getBlockID())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = @@ -181,13 +180,15 @@ public static ContainerProtos.PutBlockResponseProto putBlock( * @param xceiverClient client to perform call * @param containerBlockData block data to identify container * @param eof whether this is the last putBlock for the same block + * @param token a token for this block (may be null) * @return putBlockResponse * @throws IOException if there is an error while performing the call * @throws InterruptedException * @throws ExecutionException */ public static XceiverClientReply putBlockAsync( - XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof) + XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof, + Token token) throws IOException, InterruptedException, ExecutionException { PutBlockRequestProto.Builder createBlockRequest = PutBlockRequestProto.newBuilder() @@ -199,10 +200,8 @@ public static XceiverClientReply putBlockAsync( .setContainerID(containerBlockData.getBlockID().getContainerID()) .setDatanodeUuid(id) .setPutBlock(createBlockRequest); - String encodedToken = - getEncodedBlockToken(getService(containerBlockData.getBlockID())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); return xceiverClient.sendCommandAsync(request); @@ -215,12 +214,14 @@ public static XceiverClientReply putBlockAsync( * @param chunk information about chunk to read * @param blockID ID of the block * @param validators functions to validate the response + * @param token a token for this block (may be null) * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call */ public static ContainerProtos.ReadChunkResponseProto readChunk( XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, - List validators) throws IOException { + List validators, + Token token) throws IOException { ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto.newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()) @@ -230,10 +231,8 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id).setReadChunk(readChunkRequest); - String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto reply = @@ -248,10 +247,12 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( * @param chunk information about chunk to write * @param blockID ID of the block * @param data the data of the chunk to write + * @param token a token for this block (may be null) * @throws IOException if there is an error while performing the call */ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, - BlockID blockID, ByteString data) + BlockID blockID, ByteString data, + Token token) throws IOException { WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto .newBuilder() @@ -265,10 +266,8 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id) .setWriteChunk(writeChunkRequest); - String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); xceiverClient.sendCommand(request, getValidatorList()); @@ -281,11 +280,12 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, * @param chunk information about chunk to write * @param blockID ID of the block * @param data the data of the chunk to write + * @param token a token for this block (may be null) * @throws IOException if there is an I/O error while performing the call */ public static XceiverClientReply writeChunkAsync( XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, - ByteString data) + ByteString data, Token token) throws IOException, ExecutionException, InterruptedException { WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto.newBuilder() @@ -296,10 +296,8 @@ public static XceiverClientReply writeChunkAsync( ContainerCommandRequestProto.newBuilder().setCmdType(Type.WriteChunk) .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id).setWriteChunk(writeChunkRequest); - String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); return xceiverClient.sendCommandAsync(request); @@ -314,12 +312,13 @@ public static XceiverClientReply writeChunkAsync( * @param client - client that communicates with the container. * @param blockID - ID of the block * @param data - Data to be written into the container. + * @param token a token for this block (may be null) * @return container protocol writeSmallFile response * @throws IOException */ public static PutSmallFileResponseProto writeSmallFile( - XceiverClientSpi client, BlockID blockID, byte[] data) - throws IOException { + XceiverClientSpi client, BlockID blockID, byte[] data, + Token token) throws IOException { BlockData containerBlockData = BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()) @@ -354,10 +353,8 @@ public static PutSmallFileResponseProto writeSmallFile( .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id) .setPutSmallFile(putSmallFileRequest); - String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = @@ -477,11 +474,13 @@ public static ReadContainerResponseProto readContainer( * * @param client * @param blockID - ID of the block + * @param token a token for this block (may be null) * @return GetSmallFileResponseProto * @throws IOException */ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, - BlockID blockID) throws IOException { + BlockID blockID, + Token token) throws IOException { GetBlockRequestProto.Builder getBlock = GetBlockRequestProto .newBuilder() .setBlockID(blockID.getDatanodeBlockIDProtobuf()); @@ -497,10 +496,8 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client, .setContainerID(blockID.getContainerID()) .setDatanodeUuid(id) .setGetSmallFile(getSmallFileRequest); - String encodedToken = getEncodedBlockToken(new Text(blockID. - getContainerBlockID().toString())); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); ContainerCommandResponseProto response = @@ -531,32 +528,6 @@ public static void validateContainerResponse( response.getMessage(), response.getResult()); } - /** - * Returns a url encoded block token. Service param should match the service - * field of token. - * @param service - * - * */ - private static String getEncodedBlockToken(Text service) - throws IOException { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - Token token = - OzoneBlockTokenSelector.selectBlockToken(service, ugi.getTokens()); - if (token != null) { - return token.encodeToUrlString(); - } - return null; - } - - private static Text getService(DatanodeBlockID blockId) { - return new Text(new StringBuffer() - .append("conID: ") - .append(blockId.getContainerID()) - .append(" locID: ") - .append(blockId.getLocalID()) - .toString()); - } - public static List getValidatorList() { List validators = new ArrayList<>(1); CheckedBiFunction getValidatorList() { public static HashMap getBlockFromAllNodes( XceiverClientSpi xceiverClient, - DatanodeBlockID datanodeBlockID) throws IOException, - InterruptedException { + DatanodeBlockID datanodeBlockID, + Token token) + throws IOException, InterruptedException { GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto .newBuilder() .setBlockID(datanodeBlockID); @@ -578,14 +550,13 @@ public static List getValidatorList() { = new HashMap<>(); String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.GetBlock) - .setContainerID(datanodeBlockID.getContainerID()) - .setDatanodeUuid(id) - .setGetBlock(readBlockRequest); - String encodedToken = getEncodedBlockToken(getService(datanodeBlockID)); - if (encodedToken != null) { - builder.setEncodedToken(encodedToken); + .newBuilder() + .setCmdType(Type.GetBlock) + .setContainerID(datanodeBlockID.getContainerID()) + .setDatanodeUuid(id) + .setGetBlock(readBlockRequest); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); } ContainerCommandRequestProto request = builder.build(); Map responses = diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 8e3059abc43..81bac07eab6 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import com.google.common.annotations.VisibleForTesting; @@ -52,7 +51,7 @@ public final class BlockOutputStreamEntry extends OutputStream { private final long length; // the current position of this stream 0 <= currentPosition < length private long currentPosition; - private Token token; + private final Token token; private final int streamBufferSize; private final long streamBufferFlushSize; @@ -109,14 +108,11 @@ long getRemaining() { */ private void checkStream() throws IOException { if (this.outputStream == null) { - if (getToken() != null) { - UserGroupInformation.getCurrentUser().addToken(getToken()); - } this.outputStream = new BlockOutputStream(blockID, xceiverClientManager, pipeline, streamBufferSize, streamBufferFlushSize, streamBufferFlushDelay, streamBufferMaxSize, bufferPool, - checksumType, bytesPerChecksum); + checksumType, bytesPerChecksum, token); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index 71784c5050d..c9decc387d6 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -37,7 +37,6 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; -import org.apache.hadoop.security.UserGroupInformation; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -182,10 +181,8 @@ public void addPreallocateBlocks(OmKeyLocationInfoGroup version, } } - private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) - throws IOException { + private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) { Preconditions.checkNotNull(subKeyInfo.getPipeline()); - UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken()); BlockOutputStreamEntry.Builder builder = new BlockOutputStreamEntry.Builder() .setBlockID(subKeyInfo.getBlockID()) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java index 70c71d6d7f3..6986b4a90d7 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java @@ -148,11 +148,7 @@ public Builder setToken(Token bToken) { } public OmKeyLocationInfo build() { - if (token == null) { - return new OmKeyLocationInfo(blockID, pipeline, length, offset); - } else { - return new OmKeyLocationInfo(blockID, pipeline, length, offset, token); - } + return new OmKeyLocationInfo(blockID, pipeline, length, offset, token); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java index 7ebe1893955..31695134301 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java @@ -98,9 +98,9 @@ public void testAllocateWrite() throws Exception { BlockID blockID = ContainerTestHelper.getTestBlockID( container.getContainerInfo().getContainerID()); ContainerProtocolCalls.writeSmallFile(client, blockID, - "data123".getBytes()); + "data123".getBytes(), null); ContainerProtos.GetSmallFileResponseProto response = - ContainerProtocolCalls.readSmallFile(client, blockID); + ContainerProtocolCalls.readSmallFile(client, blockID, null); String readData = response.getData().getData().toStringUtf8(); Assert.assertEquals("data123", readData); xceiverClientManager.releaseClient(client, false); @@ -124,7 +124,7 @@ public void testInvalidBlockRead() throws Exception { container.getContainerInfo().getContainerID()); // Try to read a Key Container Name ContainerProtos.GetSmallFileResponseProto response = - ContainerProtocolCalls.readSmallFile(client, blockID); + ContainerProtocolCalls.readSmallFile(client, blockID, null); xceiverClientManager.releaseClient(client, false); } @@ -142,7 +142,7 @@ public void testInvalidContainerRead() throws Exception { BlockID blockID = ContainerTestHelper.getTestBlockID( container.getContainerInfo().getContainerID()); ContainerProtocolCalls.writeSmallFile(client, blockID, - "data123".getBytes()); + "data123".getBytes(), null); thrown.expect(StorageContainerException.class); thrown.expectMessage("ContainerID 8888 does not exist"); @@ -151,7 +151,7 @@ public void testInvalidContainerRead() throws Exception { ContainerProtos.GetSmallFileResponseProto response = ContainerProtocolCalls.readSmallFile(client, ContainerTestHelper.getTestBlockID( - nonExistContainerID)); + nonExistContainerID), null); xceiverClientManager.releaseClient(client, false); } @@ -170,14 +170,14 @@ public void testReadWriteWithBCSId() throws Exception { container.getContainerInfo().getContainerID()); ContainerProtos.PutSmallFileResponseProto responseProto = ContainerProtocolCalls - .writeSmallFile(client, blockID1, "data123".getBytes()); + .writeSmallFile(client, blockID1, "data123".getBytes(), null); long bcsId = responseProto.getCommittedBlockLength().getBlockID() .getBlockCommitSequenceId(); try { blockID1.setBlockCommitSequenceId(bcsId + 1); //read a file with higher bcsId than the container bcsId ContainerProtocolCalls - .readSmallFile(client, blockID1); + .readSmallFile(client, blockID1, null); Assert.fail("Expected exception not thrown"); } catch (StorageContainerException sce) { Assert @@ -188,12 +188,12 @@ public void testReadWriteWithBCSId() throws Exception { BlockID blockID2 = ContainerTestHelper .getTestBlockID(container.getContainerInfo().getContainerID()); ContainerProtocolCalls - .writeSmallFile(client, blockID2, "data123".getBytes()); + .writeSmallFile(client, blockID2, "data123".getBytes(), null); try { blockID1.setBlockCommitSequenceId(bcsId + 1); //read a file with higher bcsId than the committed bcsId for the block - ContainerProtocolCalls.readSmallFile(client, blockID1); + ContainerProtocolCalls.readSmallFile(client, blockID1, null); Assert.fail("Expected exception not thrown"); } catch (StorageContainerException sce) { Assert @@ -201,7 +201,7 @@ public void testReadWriteWithBCSId() throws Exception { } blockID1.setBlockCommitSequenceId(bcsId); ContainerProtos.GetSmallFileResponseProto response = - ContainerProtocolCalls.readSmallFile(client, blockID1); + ContainerProtocolCalls.readSmallFile(client, blockID1, null); String readData = response.getData().getData().toStringUtf8(); Assert.assertEquals("data123", readData); xceiverClientManager.releaseClient(client, false); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java index 7b4fc5367bb..5efb57e1040 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java @@ -113,7 +113,7 @@ public void tesGetCommittedBlockLength() throws Exception { .getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk()); client.sendCommand(putKeyRequest); response = ContainerProtocolCalls - .getCommittedBlockLength(client, blockID); + .getCommittedBlockLength(client, blockID, null); // make sure the block ids in the request and response are same. Assert.assertTrue( BlockID.getFromProtobuf(response.getBlockID()).equals(blockID)); @@ -137,7 +137,7 @@ public void testGetCommittedBlockLengthForInvalidBlock() throws Exception { try { // There is no block written inside the container. The request should // fail. - ContainerProtocolCalls.getCommittedBlockLength(client, blockID); + ContainerProtocolCalls.getCommittedBlockLength(client, blockID, null); Assert.fail("Expected exception not thrown"); } catch (StorageContainerException sce) { Assert.assertTrue(sce.getMessage().contains("Unable to find the block")); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java index 32a0b1d0027..74a7537ab17 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java @@ -156,7 +156,7 @@ private void invokeXceiverClientGetBlock(XceiverClientSpi client) .setContainerID(1) .setLocalID(1) .setBlockCommitSequenceId(1) - .build()); + .build(), null); } private void invokeXceiverClientReadChunk(XceiverClientSpi client) @@ -174,14 +174,14 @@ private void invokeXceiverClientReadChunk(XceiverClientSpi client) .setOffset(100) .build(), bid, - null); + null, null); } private void invokeXceiverClientReadSmallFile(XceiverClientSpi client) throws IOException { BlockID bid = new BlockID(1, 1); bid.setBlockCommitSequenceId(1); - ContainerProtocolCalls.readSmallFile(client, bid); + ContainerProtocolCalls.readSmallFile(client, bid, null); } private XceiverClientReply buildValidResponse() { diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ChunkKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ChunkKeyHandler.java index 4f69da78b90..898aca46f6b 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ChunkKeyHandler.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ChunkKeyHandler.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientException; @@ -51,8 +50,6 @@ import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.shell.OzoneAddress; import org.apache.hadoop.ozone.shell.keys.KeyHandler; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.kohsuke.MetaInfServices; import picocli.CommandLine.Command; import picocli.CommandLine.Parameters; @@ -121,7 +118,6 @@ protected void execute(OzoneClient client, OzoneAddress address) ContainerChunkInfo containerChunkInfo = new ContainerChunkInfo(); long containerId = keyLocation.getContainerID(); chunkPaths.clear(); - Token token = keyLocation.getToken(); Pipeline pipeline = keyLocation.getPipeline(); if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) { pipeline = Pipeline.newBuilder(pipeline) @@ -131,17 +127,14 @@ protected void execute(OzoneClient client, OzoneAddress address) .acquireClientForReadData(pipeline); // Datanode is queried to get chunk information.Thus querying the // OM,SCM and datanode helps us get chunk location information - if (token != null) { - UserGroupInformation.getCurrentUser().addToken(token); - } ContainerProtos.DatanodeBlockID datanodeBlockID = keyLocation.getBlockID() .getDatanodeBlockIDProtobuf(); // doing a getBlock on all nodes HashMap responses = null; try { - responses = ContainerProtocolCalls - .getBlockFromAllNodes(xceiverClient, datanodeBlockID); + responses = ContainerProtocolCalls.getBlockFromAllNodes( + xceiverClient, datanodeBlockID, keyLocation.getToken()); } catch (InterruptedException e) { LOG.error("Execution interrupted due to " + e); }