Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCS offload support(2): replace s3client api with jclouds related api #2065

Merged
merged 11 commits into from
Jul 20, 2018
2 changes: 1 addition & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe

### --- Ledger Offloading --- ###

# Driver to use to offload old data to long term storage (Possible values: S3)
# Driver to use to offload old data to long term storage (Possible values: S3, aws-s3, google-cloud-storage)
managedLedgerOffloadDriver=

# Maximum number of thread pool threads for ledger offloading
Expand Down
10 changes: 8 additions & 2 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,10 @@ The Apache Software License, Version 2.0
- io.swagger-swagger-annotations-1.5.3.jar
- io.swagger-swagger-core-1.5.3.jar
- io.swagger-swagger-models-1.5.3.jar
* DataSketches
* DataSketches
- com.yahoo.datasketches-memory-0.8.3.jar
- com.yahoo.datasketches-sketches-core-0.8.3.jar
* Apache Commons
* Apache Commons
- commons-beanutils-commons-beanutils-1.7.0.jar
- commons-beanutils-commons-beanutils-core-1.8.0.jar
- commons-cli-commons-cli-1.2.jar
Expand Down Expand Up @@ -461,6 +461,12 @@ The Apache Software License, Version 2.0
- org.xerial.snappy-snappy-java-1.1.1.3.jar
* Flatbuffers Java
- com.google.flatbuffers-flatbuffers-java-1.9.0.jar
* Apache Jclouds
- org.apache.jclouds-allblobstore-2.2.0-SNAPSHOT.jar
* Google Guice Core Library
- com.google.inject.guice-3.0.jar
- com.google.inject.extensions:guice-multibindings-3.0.jar
- com.google.inject.extensions:guice-assistedinject-3.0.jar


BSD 3-clause "New" or "Revised" License
Expand Down
105 changes: 105 additions & 0 deletions jclouds-shaded/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<?xml version="1.0"?>
<!--

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

-->
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>2.2.0-incubating-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>jclouds-shaded</artifactId>
<name>Apache Pulsar :: Jclouds shaded</name>

<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.jclouds</groupId>
<artifactId>jclouds-allblobstore</artifactId>
<version>2.2.0-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SNAPSHOT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, 2.2.1 has some issue related with gcs multi-part upload, so we use the latest. This is tracked in #2164

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it'll have to be updated before an actual release.

</dependency>
</dependencies>

<repositories>
<repository>
<id>jclouds-snapshots</id>
<url>https://repository.apache.org/content/repositories/snapshots</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<minimizeJar>false</minimizeJar>

<artifactSet>
<includes>
<include>com.google.code.gson:gson</include>
<include>com.google.guava:guava</include>
<include>org.apache.jclouds:*</include>
<include>org.apache.jclouds.api:*</include>
<include>org.apache.jclouds.common:*</include>
<include>org.apache.jclouds.provider:*</include>
<include>com.google.inject.extensions:guice-assistedinject</include>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are new dependencies, which need to be accounted for in the LICENSE/NOTICE.

<include>com.google.inject:guice</include>
<include>com.google.inject.extensions:guice-multibindings</include>
</includes>
</artifactSet>

<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.pulsar.shaded.com.google</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ flexible messaging model and an intuitive client API.</description>
<module>pulsar-zookeeper</module>
<module>pulsar-log4j2-appender</module>
<module>protobuf-shaded</module>
<!-- jclouds shaded for gson conflict: https://issues.apache.org/jira/browse/JCLOUDS-1166 -->
<module>jclouds-shaded</module>

<!-- functions-related modules -->
<module>pulsar-functions</module>
Expand Down
8 changes: 7 additions & 1 deletion pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<!-- functions related dependencies (begin) -->
Expand Down Expand Up @@ -273,6 +273,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to remove the aws s3 dependency too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, Seems as you commented, the AWSCredentials related things could not get out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need the full s3 dependency now, just whatever AWSCredentials is in.


<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>jclouds-shaded</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader;
import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
Expand Down Expand Up @@ -657,8 +657,8 @@ public LedgerOffloader getManagedLedgerOffloader() {
public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf)
throws PulsarServerException {
if (conf.getManagedLedgerOffloadDriver() != null
&& conf.getManagedLedgerOffloadDriver().equalsIgnoreCase(S3ManagedLedgerOffloader.DRIVER_NAME)) {
return S3ManagedLedgerOffloader.create(conf, getOffloaderScheduler(conf));
&& BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver())) {
return BlobStoreManagedLedgerOffloader.create(conf, getOffloaderScheduler(conf));
} else {
return NullLedgerOffloader.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,22 @@
*/
package org.apache.pulsar.broker.offload.impl;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

import java.io.InputStream;
import java.io.IOException;

import java.io.InputStream;
import org.apache.pulsar.broker.offload.BackedInputStream;
import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck;

import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.VersionCheck;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.options.GetOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3BackedInputStreamImpl extends BackedInputStream {
private static final Logger log = LoggerFactory.getLogger(S3BackedInputStreamImpl.class);
public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedInputStreamImpl.class);

private final AmazonS3 s3client;
private final BlobStore blobStore;
private final String bucket;
private final String key;
private final VersionCheck versionCheck;
Expand All @@ -50,10 +45,10 @@ public class S3BackedInputStreamImpl extends BackedInputStream {
private long bufferOffsetStart;
private long bufferOffsetEnd;

public S3BackedInputStreamImpl(AmazonS3 s3client, String bucket, String key,
VersionCheck versionCheck,
long objectLen, int bufferSize) {
this.s3client = s3client;
public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String key,
VersionCheck versionCheck,
long objectLen, int bufferSize) {
this.blobStore = blobStore;
this.bucket = bucket;
this.key = key;
this.versionCheck = versionCheck;
Expand All @@ -76,26 +71,24 @@ private boolean refillBufferIfNeeded() throws IOException {
long startRange = cursor;
long endRange = Math.min(cursor + bufferSize - 1,
objectLen - 1);
GetObjectRequest req = new GetObjectRequest(bucket, key)
.withRange(startRange, endRange);
log.debug("Reading range {}-{} from {}/{}", startRange, endRange, bucket, key);
try (S3Object obj = s3client.getObject(req)) {
versionCheck.check(key, obj.getObjectMetadata());

Long[] range = obj.getObjectMetadata().getContentRange();
long bytesRead = range[1] - range[0] + 1;

buffer.clear();
bufferOffsetStart = range[0];
bufferOffsetEnd = range[1];
InputStream s = obj.getObjectContent();
int bytesToCopy = (int)bytesRead;
while (bytesToCopy > 0) {
bytesToCopy -= buffer.writeBytes(s, bytesToCopy);
try {
Blob blob = blobStore.getBlob(bucket, key, new GetOptions().range(startRange, endRange));
versionCheck.check(key, blob);

try (InputStream stream = blob.getPayload().openStream()) {
buffer.clear();
bufferOffsetStart = startRange;
bufferOffsetEnd = endRange;
long bytesRead = endRange - startRange + 1;
int bytesToCopy = (int) bytesRead;
while (bytesToCopy > 0) {
bytesToCopy -= buffer.writeBytes(stream, bytesToCopy);
}
cursor += buffer.readableBytes();
}
cursor += buffer.readableBytes();
} catch (AmazonClientException e) {
throw new IOException("Error reading from S3", e);
} catch (Throwable e) {
throw new IOException("Error reading from BlobStore", e);
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@
package org.apache.pulsar.broker.offload.impl;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
Expand All @@ -42,28 +36,28 @@
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;

import org.apache.pulsar.broker.offload.BackedInputStream;
import org.apache.pulsar.broker.offload.OffloadIndexBlock;
import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
import org.apache.pulsar.broker.offload.OffloadIndexEntry;
import org.apache.pulsar.broker.offload.BackedInputStream;
import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck;

import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.VersionCheck;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3BackedReadHandleImpl implements ReadHandle {
private static final Logger log = LoggerFactory.getLogger(S3BackedReadHandleImpl.class);
public class BlobStoreBackedReadHandleImpl implements ReadHandle {
private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);

private final long ledgerId;
private final OffloadIndexBlock index;
private final BackedInputStream inputStream;
private final DataInputStream dataStream;
private final ExecutorService executor;

private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
BackedInputStream inputStream,
ExecutorService executor) {
private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
BackedInputStream inputStream,
ExecutorService executor) {
this.ledgerId = ledgerId;
this.index = index;
this.inputStream = inputStream;
Expand Down Expand Up @@ -189,22 +183,19 @@ public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsyn
}

public static ReadHandle open(ScheduledExecutorService executor,
AmazonS3 s3client, String bucket, String key, String indexKey,
BlobStore blobStore, String bucket, String key, String indexKey,
VersionCheck versionCheck,
long ledgerId, int readBufferSize)
throws AmazonClientException, IOException {
GetObjectRequest req = new GetObjectRequest(bucket, indexKey);
try (S3Object obj = s3client.getObject(req)) {
versionCheck.check(indexKey, obj.getObjectMetadata());

OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent());

BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key,
versionCheck,
index.getDataObjectLength(),
readBufferSize);
return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor);
}
Blob blob = blobStore.getBlob(bucket, indexKey);
versionCheck.check(indexKey, blob);
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
OffloadIndexBlock index = indexBuilder.fromStream(blob.getPayload().openStream());

BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
versionCheck,
index.getDataObjectLength(),
readBufferSize);
return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor);
}
}