Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-4285. Read is slow due to frequent calls to UGI.getCurrentUser() and getTokens() #1454

Merged
merged 6 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,6 +42,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;

Expand Down Expand Up @@ -96,6 +98,7 @@ public class BlockInputStream extends InputStream implements Seekable {
private int chunkIndexOfPrevPosition;

private Function<BlockID, Pipeline> refreshPipelineFunction;
private Collection<Token<? extends TokenIdentifier>> tokens;

public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
Expand Down Expand Up @@ -193,10 +196,11 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
if (token != null) {
UserGroupInformation.getCurrentUser().addToken(token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just pass down the token from the caller directly and bypass the UGI addToken/getTokens completely?

}
tokens = UserGroupInformation.getCurrentUser().getTokens();
DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf();
GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID);
.getBlock(xceiverClient, datanodeBlockID, tokens);

chunks = response.getBlockData().getChunksList();
success = true;
Expand All @@ -216,7 +220,7 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
*/
protected synchronized void addStream(ChunkInfo chunkInfo) {
chunkStreams.add(new ChunkInputStream(chunkInfo, blockID,
xceiverClient, verifyChecksum));
xceiverClient, verifyChecksum, tokens));
}

public synchronized long getRemaining() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -51,6 +52,10 @@
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync;

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -124,6 +129,7 @@ public class BlockOutputStream extends OutputStream {
private int currentBufferRemaining;
//current buffer allocated to write
private ChunkBuffer currentBuffer;
private final Collection<Token<? extends TokenIdentifier>> tokens;

/**
* Creates a new BlockOutputStream.
Expand Down Expand Up @@ -176,6 +182,7 @@ public BlockOutputStream(BlockID blockID,
failedServers = new ArrayList<>(0);
ioException = new AtomicReference<>(null);
checksum = new Checksum(checksumType, bytesPerChecksum);
tokens = UserGroupInformation.getCurrentUser().getTokens();
}

private void refreshCurrentBuffer(BufferPool pool) {
Expand Down Expand Up @@ -425,7 +432,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
try {
BlockData blockData = containerBlockData.build();
XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, blockData, close);
putBlockAsync(xceiverClient, blockData, close, tokens);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
flushFuture = future.thenApplyAsync(e -> {
Expand Down Expand Up @@ -663,8 +670,8 @@ private void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
}

try {
XceiverClientReply asyncReply =
writeChunkAsync(xceiverClient, chunkInfo, blockID.get(), data);
XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
blockID.get(), data, tokens);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
future.thenApplyAsync(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;

/**
Expand Down Expand Up @@ -72,15 +75,19 @@ public class ChunkInputStream extends InputStream implements Seekable {
// position. Once the chunk is read, this variable is reset.
private long chunkPosition = -1;

private final Collection<Token<? extends TokenIdentifier>> tokens;

private static final int EOF = -1;

ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
XceiverClientSpi xceiverClient, boolean verifyChecksum) {
XceiverClientSpi xceiverClient, boolean verifyChecksum,
Collection<Token<? extends TokenIdentifier>> tokens) {
this.chunkInfo = chunkInfo;
this.length = chunkInfo.getLen();
this.blockID = blockId;
this.xceiverClient = xceiverClient;
this.verifyChecksum = verifyChecksum;
this.tokens = tokens;
}

public synchronized long getRemaining() throws IOException {
Expand Down Expand Up @@ -332,7 +339,7 @@ protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException {
validators.add(validator);

readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, validators);
readChunkInfo, blockID, validators, tokens);

} catch (IOException e) {
if (e instanceof StorageContainerException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import static java.util.Collections.emptyList;

/**
* A dummy ChunkInputStream to mock read chunk calls to DN.
*/
Expand All @@ -42,7 +44,7 @@ public DummyChunkInputStream(TestChunkInputStream testChunkInputStream,
XceiverClientSpi xceiverClient,
boolean verifyChecksum,
byte[] data) {
super(chunkInfo, blockId, xceiverClient, verifyChecksum);
super(chunkInfo, blockId, xceiverClient, verifyChecksum, emptyList());
this.chunkData = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;

import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

/**
Expand All @@ -84,7 +86,8 @@ private ContainerProtocolCalls() {
* @throws IOException if there is an I/O error while performing the call
*/
public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
DatanodeBlockID datanodeBlockID) throws IOException {
DatanodeBlockID datanodeBlockID,
Collection<Token<? extends TokenIdentifier>> tokens) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the javadoc comment of this method as we new added param tokens here?
The same suggestion for methods we changed below:

  • putBlockAsync
  • readChunk
  • writeChunkAsync

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @linyiqun for the suggestion. Would

@param tokens list of tokens the current user has, possibly including a token for this block

be OK, or do you have a better description for this param?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adoroszlai , above description looks good to me. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can directly encode the pass in token for the specific block id without additional token selection.

GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto
.newBuilder()
.setBlockID(datanodeBlockID);
Expand All @@ -96,7 +99,8 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
.setContainerID(datanodeBlockID.getContainerID())
.setDatanodeUuid(id)
.setGetBlock(readBlockRequest);
String encodedToken = getEncodedBlockToken(getService(datanodeBlockID));
String encodedToken = getEncodedBlockToken(getService(datanodeBlockID),
tokens);
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
Expand Down Expand Up @@ -187,7 +191,8 @@ public static ContainerProtos.PutBlockResponseProto putBlock(
* @throws ExecutionException
*/
public static XceiverClientReply putBlockAsync(
XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof)
XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof,
Collection<Token<? extends TokenIdentifier>> tokens)
throws IOException, InterruptedException, ExecutionException {
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder()
Expand All @@ -200,7 +205,8 @@ public static XceiverClientReply putBlockAsync(
.setDatanodeUuid(id)
.setPutBlock(createBlockRequest);
String encodedToken =
getEncodedBlockToken(getService(containerBlockData.getBlockID()));
getEncodedBlockToken(getService(containerBlockData.getBlockID()),
tokens);
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
Expand All @@ -220,7 +226,8 @@ public static XceiverClientReply putBlockAsync(
*/
public static ContainerProtos.ReadChunkResponseProto readChunk(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
List<CheckedBiFunction> validators) throws IOException {
List<CheckedBiFunction> validators,
Collection<Token<? extends TokenIdentifier>> tokens) throws IOException {
ReadChunkRequestProto.Builder readChunkRequest =
ReadChunkRequestProto.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
Expand All @@ -231,7 +238,7 @@ public static ContainerProtos.ReadChunkResponseProto readChunk(
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id).setReadChunk(readChunkRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
getContainerBlockID().toString()), tokens);
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
Expand Down Expand Up @@ -285,7 +292,7 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
*/
public static XceiverClientReply writeChunkAsync(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
ByteString data)
ByteString data, Collection<Token<? extends TokenIdentifier>> tokens)
throws IOException, ExecutionException, InterruptedException {
WriteChunkRequestProto.Builder writeChunkRequest =
WriteChunkRequestProto.newBuilder()
Expand All @@ -297,7 +304,7 @@ public static XceiverClientReply writeChunkAsync(
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id).setWriteChunk(writeChunkRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
getContainerBlockID().toString()), tokens);
if (encodedToken != null) {
builder.setEncodedToken(encodedToken);
}
Expand Down Expand Up @@ -540,8 +547,13 @@ public static void validateContainerResponse(
private static String getEncodedBlockToken(Text service)
throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
return getEncodedBlockToken(service, ugi.getTokens());
}

private static String getEncodedBlockToken(Text service,
Collection<Token<? extends TokenIdentifier>> tokens) throws IOException {
Token<OzoneBlockTokenIdentifier> token =
OzoneBlockTokenSelector.selectBlockToken(service, ugi.getTokens());
OzoneBlockTokenSelector.selectBlockToken(service, tokens);
if (token != null) {
return token.encodeToUrlString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static java.util.Collections.emptyList;

/**
* Tests for TestXceiverClientGrpc, to ensure topology aware reads work
* select the closest node, and connections are re-used after a getBlock call.
Expand Down Expand Up @@ -156,7 +158,7 @@ private void invokeXceiverClientGetBlock(XceiverClientSpi client)
.setContainerID(1)
.setLocalID(1)
.setBlockCommitSequenceId(1)
.build());
.build(), emptyList());
}

private void invokeXceiverClientReadChunk(XceiverClientSpi client)
Expand All @@ -174,7 +176,7 @@ private void invokeXceiverClientReadChunk(XceiverClientSpi client)
.setOffset(100)
.build(),
bid,
null);
null, emptyList());
}

private void invokeXceiverClientReadSmallFile(XceiverClientSpi client)
Expand Down