Skip to content

Commit

Permalink
Merge branch 'tmp-dir-refactor' into improve-volume-scanner
Browse files Browse the repository at this point in the history
* tmp-dir-refactor: (73 commits)
  HDDS-8587. Test that CertificateClient can store multiple rootCA certificates (apache#4852)
  HDDS-8801. ReplicationManager: Add metric to count how often replication is throttled (apache#4864)
  HDDS-8477. Unit test for Snapdiff using tombstone entries (apache#4678)
  HDDS-7507. [Snapshot] Implement List Snapshot API Pagination (apache#4065) (apache#4861)
  HDDS-8373. Document that setquota doesn't accept decimals (apache#4856)
  HDDS-8779. Recon - Expose flag for enable/disable of heatmap. (apache#4845)
  HDDS-8677. Ozone admin OM CLI command for block tokens (apache#4760)
  HDDS-8164. Authorize secret key APIs (apache#4597)
  HDDS-7945. Integrate secret keys to SCM snapshot (apache#4549)
  HDDS-8003. E2E integration test cases for block tokens (apache#4547)
  HDDS-7831. Use symmetric secret key to sign and verify token (apache#4417)
  HDDS-7830. SCM API for OM and Datanode to get secret keys (apache#4345)
  HDDS-7734. Implement symmetric SecretKeys lifescycle management in SCM (apache#4194)
  HDDS-8679. Add dedicated, configurable thread pool for OM gRPC server (apache#4771)
  HDDS-8790. Split EC acceptance tests (apache#4855)
  HDDS-8714. TestScmHAFinalization: mark testFinalizationWithRestart as flaky, enable other test cases
  HDDS-8787. Reduce ozone sh calls in robot tests (apache#4854)
  HDDS-8774. Log allocation stack trace for leaked CodecBuffer (apache#4840)
  HDDS-8729. Add metric for count of blocks pending deletion on datanode (apache#4800)
  HDDS-8780. Leak of ManagedChannel in HASecurityUtils (apache#4850)
  ...
  • Loading branch information
errose28 committed Jun 10, 2023
2 parents fa6b5b5 + ccded0a commit b581bc9
Show file tree
Hide file tree
Showing 466 changed files with 15,369 additions and 6,316 deletions.
11 changes: 7 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ jobs:
run: hadoop-ozone/dev-support/checks/build.sh -Dskip.npx -Dskip.installnpx -Djavac.version=${{ matrix.java }}
env:
OZONE_WITH_COVERAGE: false
CANCEL_NATIVE_VERSION_CHECK: true
SKIP_NATIVE_VERSION_CHECK: true
- name: Delete temporary build artifacts before caching
run: |
#Never cache local artifacts
Expand All @@ -173,12 +173,12 @@ jobs:
steps:
- name: Checkout project
uses: actions/checkout@v3
if: matrix.check != 'bats'
if: matrix.check != 'bats' && matrix.check != 'unit'
- name: Checkout project with history
uses: actions/checkout@v3
with:
fetch-depth: 0
if: matrix.check == 'bats'
if: matrix.check == 'bats' || matrix.check == 'unit'
- name: Cache for maven dependencies
uses: actions/cache@v3
with:
Expand Down Expand Up @@ -254,6 +254,7 @@ jobs:
- secure
- unsecure
- compat
- EC
- HA-secure
- HA-unsecure
- MR
Expand Down Expand Up @@ -337,8 +338,10 @@ jobs:
- flaky
fail-fast: false
steps:
- name: Checkout project
- name: Checkout project with history
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Cache for maven dependencies
uses: actions/cache@v3
with:
Expand Down
12 changes: 12 additions & 0 deletions dev-support/ci/selective_ci_checks.bats
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,18 @@ load bats-assert/load.bash
assert_output -p needs-kubernetes-tests=false
}

@test "compose README" {
run dev-support/ci/selective_ci_checks.sh 85a0700980

assert_output -p 'basic-checks=["rat"]'
assert_output -p needs-build=false
assert_output -p needs-compile=false
assert_output -p needs-compose-tests=false
assert_output -p needs-dependency-check=false
assert_output -p needs-integration-tests=false
assert_output -p needs-kubernetes-tests=false
}

@test "other README" {
run dev-support/ci/selective_ci_checks.sh 5532981a7

Expand Down
2 changes: 2 additions & 0 deletions dev-support/ci/selective_ci_checks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ function get_count_compose_files() {
)
local ignore_array=(
"^hadoop-ozone/dist/src/main/k8s"
"\.md$"
)
filter_changed_files true
COUNT_COMPOSE_CHANGED_FILES=${match_count}
Expand Down Expand Up @@ -285,6 +286,7 @@ function get_count_kubernetes_files() {
)
local ignore_array=(
"^hadoop-ozone/dist/src/main/compose"
"\.md$"
)
filter_changed_files true
COUNT_KUBERNETES_CHANGED_FILES=${match_count}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor;
Expand Down Expand Up @@ -85,7 +84,8 @@
* how it works, and how it is integrated with the Ozone client.
*/
public class XceiverClientGrpc extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
private static final Logger LOG =
LoggerFactory.getLogger(XceiverClientGrpc.class);
private final Pipeline pipeline;
private final ConfigurationSource config;
private final Map<UUID, XceiverClientProtocolServiceStub> asyncStubs;
Expand Down Expand Up @@ -235,7 +235,7 @@ private boolean isConnected(ManagedChannel channel) {
* and the method waits to finish all ongoing communication.
*
* Note: the method wait 1 hour per channel tops and if that is not enough
* to finish ongoing communication, then interrupts the connection anyways.
* to finish ongoing communication, then interrupts the connection anyway.
*/
@Override
public synchronized void close() {
Expand Down Expand Up @@ -317,7 +317,7 @@ public ContainerCommandResponseProto sendCommand(

@Override
public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request, List<CheckedBiFunction> validators)
ContainerCommandRequestProto request, List<Validator> validators)
throws IOException {
try {
XceiverClientReply reply;
Expand All @@ -335,7 +335,7 @@ public ContainerCommandResponseProto sendCommand(
}

private XceiverClientReply sendCommandWithTraceIDAndRetry(
ContainerCommandRequestProto request, List<CheckedBiFunction> validators)
ContainerCommandRequestProto request, List<Validator> validators)
throws IOException {

String spanName = "XceiverClientGrpc." + request.getCmdType().name();
Expand All @@ -352,13 +352,13 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(
}

private XceiverClientReply sendCommandWithRetry(
ContainerCommandRequestProto request, List<CheckedBiFunction> validators)
ContainerCommandRequestProto request, List<Validator> validators)
throws IOException {
ContainerCommandResponseProto responseProto = null;
IOException ioException = null;

// In case of an exception or an error, we will try to read from the
// datanodes in the pipeline in a round robin fashion.
// datanodes in the pipeline in a round-robin fashion.
XceiverClientReply reply = new XceiverClientReply(null);
List<DatanodeDetails> datanodeList = null;

Expand Down Expand Up @@ -406,8 +406,8 @@ private XceiverClientReply sendCommandWithRetry(
reply.addDatanode(dn);
responseProto = sendCommandAsync(request, dn).getResponse().get();
if (validators != null && !validators.isEmpty()) {
for (CheckedBiFunction validator : validators) {
validator.apply(request, responseProto);
for (Validator validator : validators) {
validator.accept(request, responseProto);
}
}
if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
.setContainerID(blockID.get().getContainerID())
.setDatanodeUuid(id).setWriteChunk(writeChunkRequest);

if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}

ContainerCommandRequestMessage message =
ContainerCommandRequestMessage.toMessage(builder.build(), null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -259,11 +260,11 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
}
}

private static final List<CheckedBiFunction> VALIDATORS
private static final List<Validator> VALIDATORS
= ContainerProtocolCalls.toValidatorList(
(request, response) -> validate(response));

static void validate(ContainerCommandResponseProto response)
private static void validate(ContainerCommandResponseProto response)
throws IOException {
if (!response.hasGetBlock()) {
throw new IllegalArgumentException("Not GetBlock: response=" + response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,28 @@
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.ozone.common.ChunkBuffer;

import com.google.common.base.Preconditions;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.Preconditions;

import static java.util.Collections.emptyList;

/**
* This class creates and manages pool of n buffers.
*/
public class BufferPool {

private List<ChunkBuffer> bufferList;
private static final BufferPool EMPTY = new BufferPool(0, 0);

private final List<ChunkBuffer> bufferList;
private int currentBufferIndex;
private final int bufferSize;
private final int capacity;
private final Function<ByteBuffer, ByteString> byteStringConversion;

public static BufferPool empty() {
return EMPTY;
}

public BufferPool(int bufferSize, int capacity) {
this(bufferSize, capacity,
ByteStringConversion.createByteBufferConversion(false));
Expand All @@ -49,7 +57,7 @@ public BufferPool(int bufferSize, int capacity,
Function<ByteBuffer, ByteString> byteStringConversion) {
this.capacity = capacity;
this.bufferSize = bufferSize;
bufferList = new ArrayList<>(capacity);
bufferList = capacity == 0 ? emptyList() : new ArrayList<>(capacity);
currentBufferIndex = -1;
this.byteStringConversion = byteStringConversion;
}
Expand All @@ -72,26 +80,33 @@ ChunkBuffer getCurrentBuffer() {
* chunk size.
*/
public ChunkBuffer allocateBuffer(int increment) {
currentBufferIndex++;
Preconditions.checkArgument(currentBufferIndex <= capacity - 1);
final int nextBufferIndex = currentBufferIndex + 1;

Preconditions.assertTrue(nextBufferIndex < capacity, () ->
"next index: " + nextBufferIndex + " >= capacity: " + capacity);

currentBufferIndex = nextBufferIndex;

if (currentBufferIndex < bufferList.size()) {
return getBuffer(currentBufferIndex);
} else {
final ChunkBuffer newBuffer = ChunkBuffer.allocate(bufferSize, increment);
bufferList.add(newBuffer);
Preconditions.checkArgument(bufferList.size() <= capacity);
return newBuffer;
}
}

void releaseBuffer(ChunkBuffer chunkBuffer) {
Preconditions.assertTrue(!bufferList.isEmpty(), "empty buffer list");
Preconditions.assertSame(bufferList.get(0), chunkBuffer,
"only the first buffer can be released");
Preconditions.assertTrue(currentBufferIndex >= 0,
() -> "current buffer: " + currentBufferIndex);

// always remove from head of the list and append at last
final ChunkBuffer buffer = bufferList.remove(0);
// Ensure the buffer to be removed is always at the head of the list.
Preconditions.checkArgument(buffer == chunkBuffer);
buffer.clear();
bufferList.add(buffer);
Preconditions.checkArgument(currentBufferIndex >= 0);
currentBufferIndex--;
}

Expand All @@ -101,7 +116,7 @@ public void clearBufferPool() {
}

public void checkBufferPoolEmpty() {
Preconditions.checkArgument(computeBufferData() == 0);
Preconditions.assertSame(0, computeBufferData(), "total buffer size");
}

public long computeBufferData() {
Expand Down Expand Up @@ -131,4 +146,8 @@ public int getNumberOfUsedBuffers() {
public int getCapacity() {
return capacity;
}

public int getBufferSize() {
return bufferSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
Expand All @@ -48,6 +47,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -275,7 +275,7 @@ public synchronized long getPos() {
}

@Override
public boolean seekToNewSource(long targetPos) throws IOException {
public boolean seekToNewSource(long targetPos) {
return false;
}

Expand Down Expand Up @@ -324,7 +324,7 @@ private synchronized int prepareRead(int len) throws IOException {
if (buffersHaveData()) {
// Data is available from buffers
ByteBuffer bb = buffers[bufferIndex];
return len > bb.remaining() ? bb.remaining() : len;
return Math.min(len, bb.remaining());
} else if (dataRemainingInChunk()) {
// There is more data in the chunk stream which has not
// been read into the buffers yet.
Expand Down Expand Up @@ -424,7 +424,7 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
throws IOException {
ReadChunkResponseProto readChunkResponse;

List<CheckedBiFunction> validators =
List<Validator> validators =
ContainerProtocolCalls.toValidatorList(validator);

readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
Expand All @@ -443,8 +443,7 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
}
}

private CheckedBiFunction<ContainerCommandRequestProto,
ContainerCommandResponseProto, IOException> validator =
private final Validator validator =
(request, response) -> {
final ChunkInfo reqChunkInfo =
request.getReadChunk().getChunkData();
Expand Down Expand Up @@ -642,15 +641,7 @@ private boolean dataRemainingInChunk() {
* Check if current buffer had been read till the end.
*/
private boolean bufferEOF() {
if (!allocated) {
// Chunk data has not been read yet
return false;
}

if (!buffers[bufferIndex].hasRemaining()) {
return true;
}
return false;
return allocated && !buffers[bufferIndex].hasRemaining();
}

/**
Expand Down
Loading

0 comments on commit b581bc9

Please sign in to comment.