From 2310aa5e085e3aead97ab4100daa2676763a6e7b Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Sun, 1 Jul 2018 20:42:56 +0800 Subject: [PATCH] rename s3 classes to be reuse-able --- .../apache/pulsar/broker/PulsarService.java | 2 +- .../BackedInputStream.java} | 4 ++-- .../BlockAwareSegmentInputStream.java | 3 +-- .../DataBlockHeader.java | 3 +-- .../OffloadIndexBlock.java | 2 +- .../OffloadIndexBlockBuilder.java | 4 ++-- .../OffloadIndexEntry.java | 2 +- .../BlockAwareSegmentInputStreamImpl.java | 4 ++-- .../impl/DataBlockHeaderImpl.java | 4 ++-- .../impl/OffloadIndexBlockBuilderImpl.java | 6 +++--- .../impl/OffloadIndexBlockImpl.java | 7 +++---- .../impl/OffloadIndexEntryImpl.java | 4 ++-- .../impl/S3BackedInputStreamImpl.java | 8 ++++---- .../impl/S3BackedReadHandleImpl.java | 19 +++++++++---------- .../impl}/S3ManagedLedgerOffloader.java | 8 ++++---- .../S3BackedInputStreamTest.java | 18 ++++++++---------- .../broker/{s3offload => offload}/S3Mock.java | 2 +- .../{s3offload => offload}/S3TestBase.java | 7 ++----- .../BlockAwareSegmentInputStreamTest.java | 6 ++---- .../impl/DataBlockHeaderTest.java | 4 ++-- .../impl/OffloadIndexTest.java | 8 ++++---- .../impl}/S3ManagedLedgerOffloaderTest.java | 12 ++++++------ 22 files changed, 63 insertions(+), 74 deletions(-) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload/S3BackedInputStream.java => offload/BackedInputStream.java} (90%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload}/BlockAwareSegmentInputStream.java (96%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload}/DataBlockHeader.java (95%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload}/OffloadIndexBlock.java (98%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload}/OffloadIndexBlockBuilder.java (95%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload}/OffloadIndexEntry.java (97%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload}/impl/BlockAwareSegmentInputStreamImpl.java (98%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload}/impl/DataBlockHeaderImpl.java (97%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload}/impl/OffloadIndexBlockBuilderImpl.java (94%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload}/impl/OffloadIndexBlockImpl.java (98%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload}/impl/OffloadIndexEntryImpl.java (94%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload}/impl/S3BackedInputStreamImpl.java (95%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload}/impl/S3BackedReadHandleImpl.java (92%) rename pulsar-broker/src/main/java/org/apache/pulsar/broker/{s3offload => offload/impl}/S3ManagedLedgerOffloader.java (98%) rename pulsar-broker/src/test/java/org/apache/pulsar/broker/{s3offload => offload}/S3BackedInputStreamTest.java (91%) rename pulsar-broker/src/test/java/org/apache/pulsar/broker/{s3offload => offload}/S3Mock.java (99%) rename pulsar-broker/src/test/java/org/apache/pulsar/broker/{s3offload => offload}/S3TestBase.java (84%) rename pulsar-broker/src/test/java/org/apache/pulsar/broker/{s3offload => offload}/impl/BlockAwareSegmentInputStreamTest.java (99%) rename pulsar-broker/src/test/java/org/apache/pulsar/broker/{s3offload => offload}/impl/DataBlockHeaderTest.java (96%) rename pulsar-broker/src/test/java/org/apache/pulsar/broker/{s3offload => offload}/impl/OffloadIndexTest.java (97%) rename pulsar-broker/src/test/java/org/apache/pulsar/broker/{s3offload => offload/impl}/S3ManagedLedgerOffloaderTest.java (98%) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 484235fe275d1..2a341afba7e29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -61,7 +61,7 @@ import org.apache.pulsar.broker.loadbalance.LoadSheddingTask; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader; +import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3BackedInputStream.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BackedInputStream.java similarity index 90% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3BackedInputStream.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BackedInputStream.java index 28e68550f698b..b596dc5b5a904 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3BackedInputStream.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BackedInputStream.java @@ -16,12 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload; +package org.apache.pulsar.broker.offload; import java.io.InputStream; import java.io.IOException; -public abstract class S3BackedInputStream extends InputStream { +public abstract class BackedInputStream extends InputStream { public abstract void seek(long position); public abstract void seekForward(long position) throws IOException; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/BlockAwareSegmentInputStream.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BlockAwareSegmentInputStream.java similarity index 96% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/BlockAwareSegmentInputStream.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BlockAwareSegmentInputStream.java index 50d9e3728ac98..7ddd9cc527bac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/BlockAwareSegmentInputStream.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BlockAwareSegmentInputStream.java @@ -16,9 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload; +package org.apache.pulsar.broker.offload; -import java.io.IOException; import java.io.InputStream; import org.apache.bookkeeper.client.api.ReadHandle; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/DataBlockHeader.java similarity index 95% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/DataBlockHeader.java index f49b6a2d4f17b..6e2021f791f50 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/DataBlockHeader.java @@ -16,9 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload; +package org.apache.pulsar.broker.offload; -import java.io.IOException; import java.io.InputStream; import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlock.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlock.java index 2bb59b4518c93..e340283fa1d9d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlock.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload; +package org.apache.pulsar.broker.offload; import java.io.Closeable; import java.io.FilterInputStream; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlockBuilder.java similarity index 95% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlockBuilder.java index 126b4b90573b7..7975a6f7612fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlockBuilder.java @@ -16,14 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload; +package org.apache.pulsar.broker.offload; import java.io.IOException; import java.io.InputStream; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate; import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; -import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockBuilderImpl; +import org.apache.pulsar.broker.offload.impl.OffloadIndexBlockBuilderImpl; /** * Interface for builder of index block used for offload a ledger to long term storage. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexEntry.java similarity index 97% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexEntry.java index 6a976ffc8fa6b..dc98e42e98390 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexEntry.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload; +package org.apache.pulsar.broker.offload; import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate; import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamImpl.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamImpl.java index 78d23802dfd4d..06d2198d07293 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamImpl.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload.impl; +package org.apache.pulsar.broker.offload.impl; import static com.google.common.base.Preconditions.checkState; @@ -32,7 +32,7 @@ import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream; +import org.apache.pulsar.broker.offload.BlockAwareSegmentInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderImpl.java similarity index 97% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderImpl.java index 19a644a36365d..4e3a5a3215399 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderImpl.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload.impl; +package org.apache.pulsar.broker.offload.impl; import com.google.common.io.CountingInputStream; @@ -27,7 +27,7 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import org.apache.pulsar.broker.s3offload.DataBlockHeader; +import org.apache.pulsar.broker.offload.DataBlockHeader; /** * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockBuilderImpl.java similarity index 94% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockBuilderImpl.java index 1b03f009a0c70..0b64de9599bef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockBuilderImpl.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload.impl; +package org.apache.pulsar.broker.offload.impl; import static com.google.common.base.Preconditions.checkState; @@ -25,8 +25,8 @@ import java.io.InputStream; import java.util.List; import org.apache.bookkeeper.client.api.LedgerMetadata; -import org.apache.pulsar.broker.s3offload.OffloadIndexBlock; -import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder; +import org.apache.pulsar.broker.offload.OffloadIndexBlock; +import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder; /** * Interface for builder of index block used for offload a ledger to long term storage. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockImpl.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockImpl.java index e910150c79ecb..c109035ab2d08 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockImpl.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload.impl; +package org.apache.pulsar.broker.offload.impl; import static com.google.common.base.Preconditions.checkState; @@ -31,7 +31,6 @@ import java.io.DataInputStream; import java.io.IOException; -import java.io.FilterInputStream; import java.io.InputStream; import java.util.ArrayList; import java.util.List; @@ -47,8 +46,8 @@ import org.apache.bookkeeper.proto.DataFormats; import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat; import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State; -import org.apache.pulsar.broker.s3offload.OffloadIndexBlock; -import org.apache.pulsar.broker.s3offload.OffloadIndexEntry; +import org.apache.pulsar.broker.offload.OffloadIndexBlock; +import org.apache.pulsar.broker.offload.OffloadIndexEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexEntryImpl.java similarity index 94% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexEntryImpl.java index b83de85e227a3..ff5e9ce7b7832 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexEntryImpl.java @@ -16,9 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload.impl; +package org.apache.pulsar.broker.offload.impl; -import org.apache.pulsar.broker.s3offload.OffloadIndexEntry; +import org.apache.pulsar.broker.offload.OffloadIndexEntry; /** * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java similarity index 95% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java index 65f233783eaff..e55e61bf54956 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload.impl; +package org.apache.pulsar.broker.offload.impl; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3; @@ -29,13 +29,13 @@ import java.io.InputStream; import java.io.IOException; -import org.apache.pulsar.broker.s3offload.S3BackedInputStream; -import org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.VersionCheck; +import org.apache.pulsar.broker.offload.BackedInputStream; +import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3BackedInputStreamImpl extends S3BackedInputStream { +public class S3BackedInputStreamImpl extends BackedInputStream { private static final Logger log = LoggerFactory.getLogger(S3BackedInputStreamImpl.class); private final AmazonS3 s3client; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java similarity index 92% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java index 65acbb848b21f..08b5ea6c37b22 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java @@ -16,12 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload.impl; +package org.apache.pulsar.broker.offload.impl; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; import io.netty.buffer.ByteBuf; @@ -44,11 +43,11 @@ import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; -import org.apache.pulsar.broker.s3offload.OffloadIndexBlock; -import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder; -import org.apache.pulsar.broker.s3offload.OffloadIndexEntry; -import org.apache.pulsar.broker.s3offload.S3BackedInputStream; -import org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.VersionCheck; +import org.apache.pulsar.broker.offload.OffloadIndexBlock; +import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder; +import org.apache.pulsar.broker.offload.OffloadIndexEntry; +import org.apache.pulsar.broker.offload.BackedInputStream; +import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,12 +57,12 @@ public class S3BackedReadHandleImpl implements ReadHandle { private final long ledgerId; private final OffloadIndexBlock index; - private final S3BackedInputStream inputStream; + private final BackedInputStream inputStream; private final DataInputStream dataStream; private final ExecutorService executor; private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, - S3BackedInputStream inputStream, + BackedInputStream inputStream, ExecutorService executor) { this.ledgerId = ledgerId; this.index = index; @@ -201,7 +200,7 @@ public static ReadHandle open(ScheduledExecutorService executor, OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create(); OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent()); - S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key, + BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key, versionCheck, index.getDataObjectLength(), readBufferSize); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java similarity index 98% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java index f65eda96d8466..ec74d2762c17e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload; +package org.apache.pulsar.broker.offload.impl; import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import com.amazonaws.services.s3.AmazonS3; @@ -32,7 +32,6 @@ import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; import com.google.common.base.Strings; -import java.io.InputStream; import java.io.IOException; import java.util.LinkedList; import java.util.List; @@ -44,8 +43,9 @@ import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl; -import org.apache.pulsar.broker.s3offload.impl.S3BackedReadHandleImpl; +import org.apache.pulsar.broker.offload.BlockAwareSegmentInputStream; +import org.apache.pulsar.broker.offload.OffloadIndexBlock; +import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder; import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java similarity index 91% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java index 4b758695e4fc1..45bca52d59532 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload; +package org.apache.pulsar.broker.offload; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.spy; @@ -34,11 +34,9 @@ import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.s3offload.impl.S3BackedInputStreamImpl; +import org.apache.pulsar.broker.offload.impl.S3BackedInputStreamImpl; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Slf4j @@ -96,7 +94,7 @@ public void testReadingFullObject() throws Exception { metadata.setContentLength(objectSize); s3client.putObject(BUCKET, objectKey, toWrite, metadata); - S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); assertStreamsMatch(toTest, toCompare); @@ -113,7 +111,7 @@ public void testReadingFullObjectByBytes() throws Exception { metadata.setContentLength(objectSize); s3client.putObject(BUCKET, objectKey, toWrite, metadata); - S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); assertStreamsMatchByBytes(toTest, toCompare); @@ -121,7 +119,7 @@ public void testReadingFullObjectByBytes() throws Exception { @Test(expectedExceptions = IOException.class) public void testErrorOnS3Read() throws Exception { - S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, "doesn't exist", + BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, "doesn't exist", (key, md) -> {}, 1234, 1000); toTest.read(); @@ -147,7 +145,7 @@ public void testSeek() throws Exception { metadata.setContentLength(objectSize); s3client.putObject(BUCKET, objectKey, toWrite, metadata); - S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); for (Map.Entry e : seeks.entrySet()) { @@ -167,7 +165,7 @@ public void testSeekWithinCurrent() throws Exception { s3client.putObject(BUCKET, objectKey, toWrite, metadata); AmazonS3 spiedClient = spy(s3client); - S3BackedInputStream toTest = new S3BackedInputStreamImpl(spiedClient, BUCKET, objectKey, + BackedInputStream toTest = new S3BackedInputStreamImpl(spiedClient, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); @@ -208,7 +206,7 @@ public void testSeekForward() throws Exception { metadata.setContentLength(objectSize); s3client.putObject(BUCKET, objectKey, toWrite, metadata); - S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3Mock.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java similarity index 99% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3Mock.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java index 74d48e2be5723..4bfc1401a83b8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3Mock.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload; +package org.apache.pulsar.broker.offload; import com.amazonaws.services.s3.AbstractAmazonS3; import com.amazonaws.services.s3.model.AmazonS3Exception; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3TestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java similarity index 84% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3TestBase.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java index b56c8502da598..f2ea6c4b33421 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3TestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java @@ -16,18 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload; +package org.apache.pulsar.broker.offload; -import com.amazonaws.auth.AnonymousAWSCredentials; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import org.testng.annotations.BeforeMethod; public class S3TestBase { - final static String BUCKET = "pulsar-unittest"; + public final static String BUCKET = "pulsar-unittest"; protected AmazonS3 s3client = null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamTest.java similarity index 99% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamTest.java index e6f524925045f..757a13531939b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload.impl; +package org.apache.pulsar.broker.offload.impl; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; @@ -27,7 +27,6 @@ import com.google.common.primitives.Longs; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.util.concurrent.DefaultThreadFactory; import java.io.ByteArrayInputStream; import java.nio.ByteBuffer; import java.util.Arrays; @@ -35,7 +34,6 @@ import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import java.util.function.Supplier; import java.util.stream.IntStream; import lombok.Data; @@ -45,7 +43,7 @@ import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.pulsar.broker.s3offload.DataBlockHeader; +import org.apache.pulsar.broker.offload.DataBlockHeader; import org.testng.annotations.Test; import org.testng.collections.Lists; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderTest.java similarity index 96% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderTest.java index a658032881001..2f8a680c323ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload.impl; +package org.apache.pulsar.broker.offload.impl; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -27,7 +27,7 @@ import java.io.IOException; import java.io.InputStream; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.s3offload.DataBlockHeader; +import org.apache.pulsar.broker.offload.DataBlockHeader; import org.testng.annotations.Test; @Slf4j diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/OffloadIndexTest.java similarity index 97% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/OffloadIndexTest.java index 445ba011e3265..f166bd7ab9990 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/OffloadIndexTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload.impl; +package org.apache.pulsar.broker.offload.impl; import static com.google.common.base.Charsets.UTF_8; import static org.testng.Assert.assertEquals; @@ -36,9 +36,9 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.pulsar.broker.s3offload.OffloadIndexBlock; -import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder; -import org.apache.pulsar.broker.s3offload.OffloadIndexEntry; +import org.apache.pulsar.broker.offload.OffloadIndexBlock; +import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder; +import org.apache.pulsar.broker.offload.OffloadIndexEntry; import org.testng.annotations.Test; @Slf4j diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java similarity index 98% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java index ab78f07eb27b5..7b9d9a2fa956f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java @@ -16,10 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.s3offload; +package org.apache.pulsar.broker.offload.impl; -import static org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.dataBlockOffloadKey; -import static org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.indexBlockOffloadKey; +import static org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.dataBlockOffloadKey; +import static org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.indexBlockOffloadKey; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; @@ -27,7 +27,6 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.ObjectMetadata; -import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Method; import java.io.IOException; import java.util.HashMap; @@ -36,7 +35,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; @@ -53,7 +51,9 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl; +import org.apache.pulsar.broker.offload.S3TestBase; +import org.apache.pulsar.broker.offload.impl.DataBlockHeaderImpl; +import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test;