Skip to content

Commit

Permalink
[tiered storage] move offloader implementation to a separate module (#…
Browse files Browse the repository at this point in the history
…2371)

### Motivation

We need tiered storage implementation available for Pulsar SQL.
And we need to use NAR plugin for packaging different offloaders to
avoid version conflicts.

 ### Changes

- Create `tiered-storage` and `tiered-storage/jcloud` modules.
- Move `org.apache.pulsar.broker.offload` to `tiered-storage/jcloud`.
- Add `TieredStorageConfigurationData` for all tiered storage related configuration

 ### NOTES

This change is mainly for relocating files only.
  • Loading branch information
sijie committed Aug 14, 2018
1 parent 4810a26 commit 870e459
Show file tree
Hide file tree
Showing 26 changed files with 292 additions and 57 deletions.
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -76,6 +76,7 @@ flexible messaging model and an intuitive client API.</description>
<module>buildtools</module> <module>buildtools</module>
<module>managed-ledger</module> <module>managed-ledger</module>
<module>managed-ledger-shaded</module> <module>managed-ledger-shaded</module>
<module>tiered-storage</module>
<module>pulsar-common</module> <module>pulsar-common</module>
<module>pulsar-broker-common</module> <module>pulsar-broker-common</module>
<module>pulsar-broker</module> <module>pulsar-broker</module>
Expand Down
16 changes: 6 additions & 10 deletions pulsar-broker/pom.xml
Expand Up @@ -97,6 +97,12 @@
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>


<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>tiered-storage-jcloud</artifactId>
<version>${project.version}</version>
</dependency>

<dependency> <dependency>
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker-common</artifactId> <artifactId>pulsar-broker-common</artifactId>
Expand Down Expand Up @@ -125,11 +131,6 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>


<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<!-- functions related dependencies (begin) --> <!-- functions related dependencies (begin) -->


<dependency> <dependency>
Expand Down Expand Up @@ -274,11 +275,6 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>


<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>jclouds-shaded</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>


<build> <build>
Expand Down
Expand Up @@ -22,6 +22,7 @@
import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles; import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;


import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask; import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.offload.TieredStorageConfigurationData;
import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader; import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader;
import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -654,16 +656,51 @@ public LedgerOffloader getManagedLedgerOffloader() {
return offloader; return offloader;
} }


// TODO: improve the user metadata in subsequent changes
static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";


public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf) public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf)
throws PulsarServerException { throws PulsarServerException {
if (conf.getManagedLedgerOffloadDriver() != null if (conf.getManagedLedgerOffloadDriver() != null
&& BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver())) { && BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver())) {
return BlobStoreManagedLedgerOffloader.create(conf, getOffloaderScheduler(conf)); try {
return BlobStoreManagedLedgerOffloader.create(
getTieredStorageConf(conf),
ImmutableMap.of(
METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarBrokerVersionStringUtils.getGitSha()
),
getOffloaderScheduler(conf));
} catch (IOException ioe) {
throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
}
} else { } else {
return NullLedgerOffloader.INSTANCE; return NullLedgerOffloader.INSTANCE;
} }
} }


private static TieredStorageConfigurationData getTieredStorageConf(ServiceConfiguration serverConf) {
TieredStorageConfigurationData tsConf = new TieredStorageConfigurationData();
// generic settings
tsConf.setManagedLedgerOffloadDriver(serverConf.getManagedLedgerOffloadDriver());
tsConf.setManagedLedgerOffloadMaxThreads(serverConf.getManagedLedgerOffloadMaxThreads());
// s3 settings
tsConf.setS3ManagedLedgerOffloadRegion(serverConf.getS3ManagedLedgerOffloadRegion());
tsConf.setS3ManagedLedgerOffloadBucket(serverConf.getS3ManagedLedgerOffloadBucket());
tsConf.setS3ManagedLedgerOffloadServiceEndpoint(serverConf.getS3ManagedLedgerOffloadServiceEndpoint());
tsConf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
tsConf.setS3ManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
// gcs settings
tsConf.setGcsManagedLedgerOffloadRegion(serverConf.getGcsManagedLedgerOffloadRegion());
tsConf.setGcsManagedLedgerOffloadBucket(serverConf.getGcsManagedLedgerOffloadBucket());
tsConf.setGcsManagedLedgerOffloadServiceAccountKeyFile(serverConf.getGcsManagedLedgerOffloadServiceAccountKeyFile());
tsConf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
tsConf.setGcsManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
return tsConf;
}

public ZooKeeperCache getLocalZkCache() { public ZooKeeperCache getLocalZkCache() {
return localZkCache; return localZkCache;
} }
Expand Down
58 changes: 58 additions & 0 deletions tiered-storage/jcloud/pom.xml
@@ -0,0 +1,58 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>tiered-storage-parent</artifactId>
<version>2.2.0-incubating-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>tiered-storage-jcloud</artifactId>
<name>Apache Pulsar :: Tiered Storage :: JCloud</name>

<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>managed-ledger-original</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>jclouds-shaded</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>managed-ledger-original</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.offload;

import java.io.Serializable;
import lombok.Data;

/**
* Configuration for tiered storage.
*/
@Data
public class TieredStorageConfigurationData implements Serializable, Cloneable{

/**** --- Ledger Offloading --- ****/
// Driver to use to offload old data to long term storage
private String managedLedgerOffloadDriver = null;

// Maximum number of thread pool threads for ledger offloading
private int managedLedgerOffloadMaxThreads = 2;

// For Amazon S3 ledger offload, AWS region
private String s3ManagedLedgerOffloadRegion = null;

// For Amazon S3 ledger offload, Bucket to place offloaded ledger into
private String s3ManagedLedgerOffloadBucket = null;

// For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing)
private String s3ManagedLedgerOffloadServiceEndpoint = null;

// For Amazon S3 ledger offload, Max block size in bytes.
private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; // 64MB

// For Amazon S3 ledger offload, Read buffer size in bytes.
private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB

// For Google Cloud Storage ledger offload, region where offload bucket is located.
// reference this page for more details: https://cloud.google.com/storage/docs/bucket-locations
private String gcsManagedLedgerOffloadRegion = null;

// For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into
private String gcsManagedLedgerOffloadBucket = null;

// For Google Cloud Storage ledger offload, Max block size in bytes.
private int gcsManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; // 64MB

// For Google Cloud Storage ledger offload, Read buffer size in bytes.
private int gcsManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB

// For Google Cloud Storage, path to json file containing service account credentials.
// For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849
private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;

}
Expand Up @@ -18,7 +18,6 @@
*/ */
package org.apache.pulsar.broker.offload.impl; package org.apache.pulsar.broker.offload.impl;


import com.amazonaws.AmazonClientException;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import java.io.DataInputStream; import java.io.DataInputStream;
Expand Down Expand Up @@ -186,7 +185,7 @@ public static ReadHandle open(ScheduledExecutorService executor,
BlobStore blobStore, String bucket, String key, String indexKey, BlobStore blobStore, String bucket, String key, String indexKey,
VersionCheck versionCheck, VersionCheck versionCheck,
long ledgerId, int readBufferSize) long ledgerId, int readBufferSize)
throws AmazonClientException, IOException { throws IOException {
Blob blob = blobStore.getBlob(bucket, indexKey); Blob blob = blobStore.getBlob(bucket, indexKey);
versionCheck.check(indexKey, blob); versionCheck.check(indexKey, blob);
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create(); OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
Expand Down

0 comments on commit 870e459

Please sign in to comment.