Skip to content

Commit

Permalink
Merge branch 'master' into HDDS-4123
Browse files Browse the repository at this point in the history
* master: (23 commits)
  HDDS-4122. Implement OM Delete Expired Open Key Request and Response (apache#1435)
  HDDS-4336. ContainerInfo does not persist BCSID (sequenceId) leading to failed replica reports (apache#1488)
  Remove extra serialization from getBlockID (apache#1470)
  HDDS-4262. Use ClientID and CallID from Rpc Client to detect retry requests (apache#1436)
  HDDS-4285. Read is slow due to frequent calls to UGI.getCurrentUser() and getTokens() (apache#1454)
  HDDS-4312. findbugs check succeeds despite compile error (apache#1476)
  HDDS-4311. Type-safe config design doc points to OM HA (apache#1477)
  HDDS-3814. Drop a column family through debug cli tool (apache#1083)
  HDDS-3728. Bucket space: check quotaUsageInBytes when write key and allocate block. (apache#1458)
  HDDS-4316. Upgrade to angular 1.8.0 due to CVE-2020-7676 (apache#1481)
  HDDS-4325. Incompatible return codes from Ozone getconf -confKey (apache#1485). Contributed by Doroszlai, Attila.
  HDDS-4309. Fix inconsistency in recon config keys starting with recon and not ozone (apache#1478)
  HDDS-4310: Ozone getconf broke the compatibility (apache#1475)
  HDDS-4298. Use an interface in Ozone client instead of XceiverClientManager (apache#1460)
  HDDS-4280. Document notable configurations for Recon. (apache#1448)
  HDDS-4156. add hierarchical layout to Chinese doc (apache#1368)
  HDDS-4242. Copy PrefixInfo proto to new project hadoop-ozone/interface-storage (apache#1444)
  HDDS-4264. Uniform naming conventions of Ozone Shell Options. (apache#1447)
  HDDS-4271. Avoid logging chunk content in Ozone Insight (apache#1466)
  HDDS-4299. Display Ratis version with ozone version (apache#1464)
  ...
  • Loading branch information
errose28 committed Oct 14, 2020
2 parents b1816d4 + 7db0ea8 commit 7100ff7
Show file tree
Hide file tree
Showing 144 changed files with 3,483 additions and 1,034 deletions.
4 changes: 2 additions & 2 deletions LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ MIT License
hadoop-hdds/framework/src/main/resources/webapps/static/bootstrap-3.4.1
hadoop-hdds/docs/themes/ozonedoc/static/css/bootstrap-*

hadoop-hdds/framework/src/main/resources/webapps/static/angular-route-1.7.9.min.js
hadoop-hdds/framework/src/main/resources/webapps/static/angular-route-1.8.0.min.js
hadoop-hdds/framework/src/main/resources/webapps/static/angular-nvd3-1.0.9.min.js
hadoop-hdds/framework/src/main/resources/webapps/static/angular-1.7.9.min.js
hadoop-hdds/framework/src/main/resources/webapps/static/angular-1.8.0.min.js

hadoop-hdds/framework/src/main/resources/webapps/static/jquery-3.5.1.min.js
hadoop-hdds/docs/themes/ozonedoc/static/js/jquery-3.5.1.min.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;

import java.io.IOException;

import org.apache.hadoop.hdds.scm.pipeline.Pipeline;

/**
* Interface to provide XceiverClient when needed.
*/
public interface XceiverClientFactory {

XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException;

void releaseClient(XceiverClientSpi xceiverClient, boolean invalidateClient);

XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
throws IOException;

void releaseClientForReadData(XceiverClientSpi xceiverClient, boolean b);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,37 @@

package org.apache.hadoop.hdds.scm;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.io.Closeable;
import java.io.IOException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.NO_REPLICA_FOUND;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* XceiverClientManager is responsible for the lifecycle of XceiverClient
Expand All @@ -66,7 +62,7 @@
* without reestablishing connection. But the connection will be closed if
* not being used for a period of time.
*/
public class XceiverClientManager implements Closeable {
public class XceiverClientManager implements Closeable, XceiverClientFactory {
private static final Logger LOG =
LoggerFactory.getLogger(XceiverClientManager.class);
//TODO : change this to SCM configuration class
Expand Down Expand Up @@ -278,10 +274,6 @@ public void close() {
}
}

public Function<ByteBuffer, ByteString> byteBufferToByteStringConversion(){
return ByteStringConversion.createByteBufferConversion(conf);
}

/**
* Get xceiver client metric.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,32 @@

package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

/**
* An {@link InputStream} called from KeyInputStream to read a block from the
* container.
Expand All @@ -62,7 +62,7 @@ public class BlockInputStream extends InputStream implements Seekable {
private Pipeline pipeline;
private final Token<OzoneBlockTokenIdentifier> token;
private final boolean verifyChecksum;
private XceiverClientManager xceiverClientManager;
private XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private boolean initialized = false;

Expand Down Expand Up @@ -99,23 +99,24 @@ public class BlockInputStream extends InputStream implements Seekable {

public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientManager xceiverClientManager,
XceiverClientFactory xceiverClientFctry,
Function<BlockID, Pipeline> refreshPipelineFunction) {
this.blockID = blockId;
this.length = blockLen;
this.pipeline = pipeline;
this.token = token;
this.verifyChecksum = verifyChecksum;
this.xceiverClientManager = xceiverClientManager;
this.xceiverClientFactory = xceiverClientFctry;
this.refreshPipelineFunction = refreshPipelineFunction;
}

public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
XceiverClientManager xceiverClientManager) {
XceiverClientManager xceiverClientFactory
) {
this(blockId, blockLen, pipeline, token, verifyChecksum,
xceiverClientManager, null);
xceiverClientFactory, null);
}
/**
* Initialize the BlockInputStream. Get the BlockData (list of chunks) from
Expand Down Expand Up @@ -181,7 +182,7 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
pipeline = Pipeline.newBuilder(pipeline)
.setType(HddsProtos.ReplicationType.STAND_ALONE).build();
}
xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline);
xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline);
boolean success = false;
List<ChunkInfo> chunks;
try {
Expand All @@ -190,19 +191,16 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
blockID.getContainerID());
}

if (token != null) {
UserGroupInformation.getCurrentUser().addToken(token);
}
DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf();
GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID);
.getBlock(xceiverClient, datanodeBlockID, token);

chunks = response.getBlockData().getChunksList();
success = true;
} finally {
if (!success) {
xceiverClientManager.releaseClientForReadData(xceiverClient, false);
xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
}
}

Expand All @@ -216,7 +214,7 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
*/
protected synchronized void addStream(ChunkInfo chunkInfo) {
chunkStreams.add(new ChunkInputStream(chunkInfo, blockID,
xceiverClient, verifyChecksum));
xceiverClient, verifyChecksum, token));
}

public synchronized long getRemaining() throws IOException {
Expand Down Expand Up @@ -378,9 +376,9 @@ public boolean seekToNewSource(long targetPos) throws IOException {

@Override
public synchronized void close() {
if (xceiverClientManager != null && xceiverClient != null) {
xceiverClientManager.releaseClient(xceiverClient, false);
xceiverClientManager = null;
if (xceiverClientFactory != null && xceiverClient != null) {
xceiverClientFactory.releaseClient(xceiverClient, false);
xceiverClientFactory = null;
xceiverClient = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
Expand All @@ -51,6 +51,9 @@
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync;

import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -80,7 +83,7 @@ public class BlockOutputStream extends OutputStream {
private AtomicReference<BlockID> blockID;

private final BlockData.Builder containerBlockData;
private XceiverClientManager xceiverClientManager;
private XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private final int bytesPerChecksum;
private int chunkIndex;
Expand Down Expand Up @@ -124,40 +127,44 @@ public class BlockOutputStream extends OutputStream {
private int currentBufferRemaining;
//current buffer allocated to write
private ChunkBuffer currentBuffer;
private final Token<? extends TokenIdentifier> token;

/**
* Creates a new BlockOutputStream.
*
* @param blockID block ID
* @param xceiverClientManager client manager that controls client
* @param xceiverClientFactory client manager that controls client
* @param pipeline pipeline where block will be written
* @param bufferPool pool of buffers
* @param streamBufferFlushSize flush size
* @param streamBufferMaxSize max size of the currentBuffer
* @param checksumType checksum type
* @param bytesPerChecksum Bytes per checksum
* @param token a token for this block (may be null)
*/
@SuppressWarnings("parameternumber")
public BlockOutputStream(BlockID blockID,
XceiverClientManager xceiverClientManager, Pipeline pipeline,
XceiverClientFactory xceiverClientFactory, Pipeline pipeline,
int streamBufferSize, long streamBufferFlushSize,
boolean streamBufferFlushDelay, long streamBufferMaxSize,
BufferPool bufferPool, ChecksumType checksumType,
int bytesPerChecksum) throws IOException {
int bytesPerChecksum, Token<? extends TokenIdentifier> token)
throws IOException {
this.blockID = new AtomicReference<>(blockID);
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
this.containerBlockData =
BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
.addMetadata(keyValue);
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
this.xceiverClientFactory = xceiverClientFactory;
this.xceiverClient = xceiverClientFactory.acquireClient(pipeline);
this.streamBufferSize = streamBufferSize;
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferMaxSize = streamBufferMaxSize;
this.streamBufferFlushDelay = streamBufferFlushDelay;
this.bufferPool = bufferPool;
this.bytesPerChecksum = bytesPerChecksum;
this.token = token;

//number of buffers used before doing a flush
refreshCurrentBuffer(bufferPool);
Expand Down Expand Up @@ -425,7 +432,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
try {
BlockData blockData = containerBlockData.build();
XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, blockData, close);
putBlockAsync(xceiverClient, blockData, close, token);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
flushFuture = future.thenApplyAsync(e -> {
Expand Down Expand Up @@ -477,7 +484,7 @@ ContainerCommandResponseProto> executePutBlock(boolean close,

@Override
public void flush() throws IOException {
if (xceiverClientManager != null && xceiverClient != null
if (xceiverClientFactory != null && xceiverClient != null
&& bufferPool != null && bufferPool.getSize() > 0
&& (!streamBufferFlushDelay ||
writtenDataLength - totalDataFlushedLength >= streamBufferSize)) {
Expand Down Expand Up @@ -543,7 +550,7 @@ private void handleFlush(boolean close)

@Override
public void close() throws IOException {
if (xceiverClientManager != null && xceiverClient != null
if (xceiverClientFactory != null && xceiverClient != null
&& bufferPool != null && bufferPool.getSize() > 0) {
try {
handleFlush(true);
Expand Down Expand Up @@ -604,10 +611,10 @@ private void setIoException(Exception e) {
}

public void cleanup(boolean invalidateClient) {
if (xceiverClientManager != null) {
xceiverClientManager.releaseClient(xceiverClient, invalidateClient);
if (xceiverClientFactory != null) {
xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
}
xceiverClientManager = null;
xceiverClientFactory = null;
xceiverClient = null;
commitWatcher.cleanup();
if (bufferList != null) {
Expand Down Expand Up @@ -663,8 +670,8 @@ private void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
}

try {
XceiverClientReply asyncReply =
writeChunkAsync(xceiverClient, chunkInfo, blockID.get(), data);
XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
blockID.get(), data, token);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
future.thenApplyAsync(e -> {
Expand Down
Loading

0 comments on commit 7100ff7

Please sign in to comment.