Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCS offload support(3): add configs to support GCS driver #2151

Merged
merged 13 commits into from
Jul 24, 2018
Merged
19 changes: 19 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
### --- Ledger Offloading --- ###

# Driver to use to offload old data to long term storage (Possible values: S3, aws-s3, google-cloud-storage)
# When using google-cloud-storage, Make sure both Google Cloud Storage and Google Cloud Storage JSON API are enabled for
# the project (check from Developers Console -> Api&auth -> APIs).
managedLedgerOffloadDriver=

# Maximum number of thread pool threads for ledger offloading
Expand All @@ -507,6 +509,23 @@ s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864
# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576

# For Google Cloud Storage ledger offload, region where offload bucket is located.
# reference this page for more details: https://cloud.google.com/storage/docs/bucket-locations
gcsManagedLedgerOffloadRegion=

# For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into
gcsManagedLedgerOffloadBucket=

# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by default, 5MB minimum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5MB minimum was an AWS imposed constraint. It's probably different for GCS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Yes, It is also 5 MB in GCS.
https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload
Seems GCS make some alignment with aws s3

gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864

# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by default)
gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576

# For Google Cloud Storage, path to json file containing service account credentials.
# For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849
gcsManagedLedgerOffloadServiceAccountKeyFile=

### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,25 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(minValue = 1024)
private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB

// For Google Cloud Storage ledger offload, region where offload bucket is located.
// reference this page for more details: https://cloud.google.com/storage/docs/bucket-locations
private String gcsManagedLedgerOffloadRegion = null;

// For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into
private String gcsManagedLedgerOffloadBucket = null;

// For Google Cloud Storage ledger offload, Max block size in bytes.
@FieldContext(minValue = 5242880) // 5MB
private int gcsManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; // 64MB

// For Google Cloud Storage ledger offload, Read buffer size in bytes.
@FieldContext(minValue = 1024)
private int gcsManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB

// For Google Cloud Storage, path to json file containing service account credentials.
// For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849
private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;

public String getZookeeperServers() {
return zookeeperServers;
}
Expand Down Expand Up @@ -1733,6 +1752,46 @@ public int getS3ManagedLedgerOffloadReadBufferSizeInBytes() {
return this.s3ManagedLedgerOffloadReadBufferSizeInBytes;
}

public void setGcsManagedLedgerOffloadRegion(String region) {
this.gcsManagedLedgerOffloadRegion = region;
}

public String getGcsManagedLedgerOffloadRegion() {
return this.gcsManagedLedgerOffloadRegion;
}

public void setGcsManagedLedgerOffloadBucket(String bucket) {
this.gcsManagedLedgerOffloadBucket = bucket;
}

public String getGcsManagedLedgerOffloadBucket() {
return this.gcsManagedLedgerOffloadBucket;
}

public void setGcsManagedLedgerOffloadMaxBlockSizeInBytes(int blockSizeInBytes) {
this.gcsManagedLedgerOffloadMaxBlockSizeInBytes = blockSizeInBytes;
}

public int getGcsManagedLedgerOffloadMaxBlockSizeInBytes() {
return this.gcsManagedLedgerOffloadMaxBlockSizeInBytes;
}

public void setGcsManagedLedgerOffloadReadBufferSizeInBytes(int readBufferSizeInBytes) {
this.gcsManagedLedgerOffloadReadBufferSizeInBytes = readBufferSizeInBytes;
}

public int getGcsManagedLedgerOffloadReadBufferSizeInBytes() {
return this.gcsManagedLedgerOffloadReadBufferSizeInBytes;
}

public void setGcsManagedLedgerOffloadServiceAccountKeyFile(String keyPath) {
this.gcsManagedLedgerOffloadServiceAccountKeyFile = keyPath;
}

public String getGcsManagedLedgerOffloadServiceAccountKeyFile() {
return this.gcsManagedLedgerOffloadServiceAccountKeyFile;
}

public void setBrokerServiceCompactionMonitorIntervalInSeconds(int interval) {
this.brokerServiceCompactionMonitorIntervalInSeconds = interval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -49,9 +53,11 @@
import org.jclouds.blobstore.domain.MultipartPart;
import org.jclouds.blobstore.domain.MultipartUpload;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.domain.Credentials;
import org.jclouds.domain.Location;
import org.jclouds.domain.LocationBuilder;
import org.jclouds.domain.LocationScope;
import org.jclouds.googlecloud.GoogleCredentialsFromJson;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.s3.reference.S3Constants;
Expand All @@ -63,6 +69,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {

public static final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"};

// use these keys for both s3 and gcs.
static final String METADATA_FORMAT_VERSION_KEY = "S3ManagedLedgerOffloaderFormatVersion";
static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";
Expand All @@ -72,11 +79,19 @@ public static boolean driverSupported(String driver) {
return Arrays.stream(DRIVER_NAMES).anyMatch(d -> d.equalsIgnoreCase(driver));
}

public static boolean isS3Driver(String driver) {
return driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1]);
}

public static boolean isGcsDriver(String driver) {
return driver.equalsIgnoreCase(DRIVER_NAMES[2]);
}

private static void addVersionInfo(BlobBuilder blobBuilder) {
blobBuilder.userMetadata(ImmutableMap.of(
METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION,
METADATA_SOFTWARE_VERSION_KEY, PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
METADATA_SOFTWARE_GITSHA_KEY, PulsarBrokerVersionStringUtils.getGitSha()));
METADATA_FORMAT_VERSION_KEY.toLowerCase(), CURRENT_VERSION,
METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarBrokerVersionStringUtils.getGitSha()));
}

private final VersionCheck VERSION_CHECK = (key, blob) -> {
Expand Down Expand Up @@ -104,30 +119,91 @@ public static BlobStoreManagedLedgerOffloader create(ServiceConfiguration conf,
OrderedScheduler scheduler)
throws PulsarServerException {
String driver = conf.getManagedLedgerOffloadDriver();
String region = conf.getS3ManagedLedgerOffloadRegion();
String bucket = conf.getS3ManagedLedgerOffloadBucket();
String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes();
int readBufferSize = conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes();
if (!driverSupported(driver)) {
throw new PulsarServerException(
"Not support this kind of driver as offload backend: " + driver);
}

if (Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
String region = isS3Driver(driver) ?
conf.getS3ManagedLedgerOffloadRegion() :
conf.getGcsManagedLedgerOffloadRegion();
String bucket = isS3Driver(driver) ?
conf.getS3ManagedLedgerOffloadBucket() :
conf.getGcsManagedLedgerOffloadBucket();
int maxBlockSize = isS3Driver(driver) ?
conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes() :
conf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes();
int readBufferSize = isS3Driver(driver) ?
conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes() :
conf.getGcsManagedLedgerOffloadReadBufferSizeInBytes();

if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
throw new PulsarServerException(
"Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set"
+ " if s3 offload enabled");
}

if (Strings.isNullOrEmpty(bucket)) {
throw new PulsarServerException("s3ManagedLedgerOffloadBucket cannot be empty if s3 offload enabled");
throw new PulsarServerException(
"ManagedLedgerOffloadBucket cannot be empty for s3 and gcs offload");
}
if (maxBlockSize < 5*1024*1024) {
throw new PulsarServerException("s3ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB");
throw new PulsarServerException(
"ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for s3 and gcs offload");
}

return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, maxBlockSize, readBufferSize, endpoint, region);
Credentials credentials = getCredentials(driver, conf);

return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, maxBlockSize, readBufferSize, endpoint, region, credentials);
}

public static Credentials getCredentials(String driver, ServiceConfiguration conf) throws PulsarServerException {
// credentials:
// for s3, get by DefaultAWSCredentialsProviderChain.
// for gcs, use downloaded file 'google_creds.json', which contains service account key by
// following instructions in page https://support.google.com/googleapi/answer/6158849

if (isGcsDriver(driver)) {
String gcsKeyPath = conf.getGcsManagedLedgerOffloadServiceAccountKeyFile();
if (Strings.isNullOrEmpty(gcsKeyPath)) {
throw new PulsarServerException(
"The service account key path is empty for GCS driver");
}
try {
String gcsKeyContent = Files.toString(new File(gcsKeyPath), Charset.defaultCharset());
return new GoogleCredentialsFromJson(gcsKeyContent).get();
} catch (IOException ioe) {
log.error("Cannot read GCS service account credentials file: {}", gcsKeyPath);
throw new PulsarServerException(ioe);
}
} else if (isS3Driver(driver)) {
AWSCredentials credentials = null;
try {
DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance();
credentials = creds.getCredentials();
} catch (Exception e) {
// allowed, some mock s3 service not need credential
log.error("Exception when get credentials for s3 ", e);
throw new PulsarServerException(e);
}

String id = "accesskey";
String key = "secretkey";
if (credentials != null) {
id = credentials.getAWSAccessKeyId();
key = credentials.getAWSSecretKey();
}
return new Credentials(id, key);
} else {
throw new PulsarServerException(
"Not support this kind of driver: " + driver);
}
}

// build context for jclouds BlobStoreContext
BlobStoreManagedLedgerOffloader(String driver, String container, OrderedScheduler scheduler,
int maxBlockSize, int readBufferSize, String endpoint, String region) {
int maxBlockSize, int readBufferSize, String endpoint, String region, Credentials credentials) {
this.scheduler = scheduler;
this.readBufferSize = readBufferSize;

Expand All @@ -142,24 +218,9 @@ public static BlobStoreManagedLedgerOffloader create(ServiceConfiguration conf,
overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, Integer.toString(100));

ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
contextBuilder.credentials(credentials.identity, credentials.credential);

AWSCredentials credentials = null;
try {
DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance();
credentials = creds.getCredentials();
} catch (Exception e) {
log.error("Exception when get credentials for s3 ", e);
}

String id = "accesskey";
String key = "secretkey";
if (credentials != null) {
id = credentials.getAWSAccessKeyId();
key = credentials.getAWSSecretKey();
}
contextBuilder.credentials(id, key);

if (!Strings.isNullOrEmpty(endpoint)) {
if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) {
contextBuilder.endpoint(endpoint);
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
}
Expand All @@ -174,7 +235,8 @@ public static BlobStoreManagedLedgerOffloader create(ServiceConfiguration conf,
this.blobStore = context.getBlobStore();
}

// build context for jclouds BlobStoreContext
// build context for jclouds BlobStoreContext, mostly used in test
@VisibleForTesting
BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container, OrderedScheduler scheduler,
int maxBlockSize, int readBufferSize) {
this.scheduler = scheduler;
Expand All @@ -192,6 +254,14 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId);
}

public boolean createBucket() {
return blobStore.createContainerInLocation(location, bucket);
}

public void deleteBucket() {
blobStore.deleteContainer(bucket);
}

// upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new Block,
@Override
public CompletableFuture<Void> offload(ReadHandle readHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.HashMap;
Expand Down Expand Up @@ -179,6 +180,55 @@ public void testSmallBlockSizeConfigured() throws Exception {
}
}

@Test
public void testGcsNoKeyPath() throws Exception {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setManagedLedgerOffloadDriver("google-cloud-storage");
conf.setGcsManagedLedgerOffloadBucket(BUCKET);

try {
BlobStoreManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
} catch (PulsarServerException pse) {
// correct
log.error("Expected pse", pse);
}
}

@Test
public void testGcsNoBucketConfigured() throws Exception {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setManagedLedgerOffloadDriver("google-cloud-storage");
File tmpKeyFile = File.createTempFile("gcsOffload", "json");
conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath());

try {
BlobStoreManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
} catch (PulsarServerException pse) {
// correct
log.error("Expected pse", pse);
}
}

@Test
public void testGcsSmallBlockSizeConfigured() throws Exception {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setManagedLedgerOffloadDriver("google-cloud-storage");
File tmpKeyFile = File.createTempFile("gcsOffload", "json");
conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath());
conf.setGcsManagedLedgerOffloadBucket(BUCKET);
conf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(1024);

try {
BlobStoreManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
} catch (PulsarServerException pse) {
// correct
log.error("Expected pse", pse);
}
}

@Test
public void testOffloadAndRead() throws Exception {
ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
Expand Down