Skip to content

Commit

Permalink
Introduce PutBlobOptions in router API (linkedin#983)
Browse files Browse the repository at this point in the history
This will allow request options to be passed into the Router::putBlob
method. In contrast to the BlobProperties, which are persisted with
the blob in storage, PutBlobOptions only influence the router logic.

For now, the options object is empty, but will soon include some options
required for data chunk upload.
  • Loading branch information
cgtz committed Jul 20, 2018
1 parent 0c8a059 commit 5bb777b
Show file tree
Hide file tree
Showing 34 changed files with 817 additions and 121 deletions.
4 changes: 2 additions & 2 deletions NOTICE
Expand Up @@ -11,7 +11,7 @@ CONDITIONS OF ANY KIND, either express or implied.
This product includes software developed by The Apache Software
Foundation (http://www.apache.org/).

This product includes/uses Codehale Metrics (https://github.com/dropwizard/metrics)
This product includes/uses Codehale Metrics (https://github.com/dropwizard/frontendMetrics)
Copyright (C) 2010 Code Hale, Yammer.com
License: Apache 2.0

Expand All @@ -20,7 +20,7 @@ Copyright (c) 2004 QOS.ch
License: MIT

This product includes/uses JOpt Simple (http://jopt-simple.sourceforge.net)
Copyright (C) 2016 JOpt Simple
Copyright (C) 2016 JOpt Simple
License: MIT

This product includes/uses Bouncy Castle (http://www.bouncycastle.org/)
Expand Down
Expand Up @@ -21,6 +21,7 @@
* Configuration parameters required by the Ambry frontend.
*/
public class FrontendConfig {
private static final String PREFIX = "frontend.";

/**
* Cache validity in seconds for non-private blobs for GET.
Expand Down Expand Up @@ -64,6 +65,14 @@ public class FrontendConfig {
@Default("com.github.ambry.frontend.AmbryUrlSigningServiceFactory")
public final String frontendUrlSigningServiceFactory;

/**
* The IdSigningService that needs to be used by AmbryBlobStorageService to sign and verify IDs.
*/
private static final String ID_SIGNING_SERVICE_FACTORY_KEY = PREFIX + "id.signing.service.factory";
@Config(ID_SIGNING_SERVICE_FACTORY_KEY)
@Default("com.github.ambry.frontend.AmbryIdSigningServiceFactory")
public final String frontendIdSigningServiceFactory;

/**
* The comma separated list of prefixes to remove from paths.
*/
Expand Down Expand Up @@ -123,6 +132,16 @@ public class FrontendConfig {
@Default("5 * 60")
public final long frontendUrlSignerDefaultUrlTtlSecs;

private static final String CHUNK_UPLOAD_INITIAL_CHUNK_TTL_SECS_KEY = PREFIX + "chunk.upload.initial.chunk.ttl.secs";
@Config(CHUNK_UPLOAD_INITIAL_CHUNK_TTL_SECS_KEY)
@Default("24 * 60 * 60")
public final long chunkUploadInitialChunkTtlSecs;

private static final String CHUNK_UPLOAD_MAX_CHUNK_SIZE_KEY = PREFIX + "chunk.upload.max.chunk.size";
@Config(CHUNK_UPLOAD_MAX_CHUNK_SIZE_KEY)
@Default("4 * 1024 * 1024")
public final long chunkUploadMaxChunkSize;

public FrontendConfig(VerifiableProperties verifiableProperties) {
frontendCacheValiditySeconds = verifiableProperties.getLong("frontend.cache.validity.seconds", 365 * 24 * 60 * 60);
frontendOptionsValiditySeconds = verifiableProperties.getLong("frontend.options.validity.seconds", 24 * 60 * 60);
Expand All @@ -134,6 +153,8 @@ public FrontendConfig(VerifiableProperties verifiableProperties) {
"com.github.ambry.frontend.AmbrySecurityServiceFactory");
frontendUrlSigningServiceFactory = verifiableProperties.getString("frontend.url.signing.service.factory",
"com.github.ambry.frontend.AmbryUrlSigningServiceFactory");
frontendIdSigningServiceFactory = verifiableProperties.getString(ID_SIGNING_SERVICE_FACTORY_KEY,
"com.github.ambry.frontend.AmbryIdSigningServiceFactory");
frontendPathPrefixesToRemove =
Arrays.asList(verifiableProperties.getString("frontend.path.prefixes.to.remove", "").split(","));
frontendChunkedGetResponseThresholdInBytes =
Expand All @@ -152,5 +173,8 @@ public FrontendConfig(VerifiableProperties verifiableProperties) {
frontendUrlSignerDefaultUrlTtlSecs =
verifiableProperties.getLongInRange("frontend.url.signer.default.url.ttl.secs", 5 * 60, 0,
frontendUrlSignerMaxUrlTtlSecs);
chunkUploadInitialChunkTtlSecs =
verifiableProperties.getLong(CHUNK_UPLOAD_INITIAL_CHUNK_TTL_SECS_KEY, 24 * 60 * 60);
chunkUploadMaxChunkSize = verifiableProperties.getLong(CHUNK_UPLOAD_MAX_CHUNK_SIZE_KEY, 4 * 1024 * 1024);
}
}
@@ -0,0 +1,52 @@
/*
* Copyright 2018 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.github.ambry.frontend;

import com.github.ambry.rest.RestServiceException;
import com.github.ambry.utils.Pair;
import java.util.Map;


/**
* Responsible for providing and verifying blob IDs that are signed with some additional metadata. The implementation
* should include a secure signature field that prevents other parties from tampering with the ID. The implementation
* can also choose to encrypt/decrypt the signed ID. The generated signed ID must be URL-safe.
*/
public interface IdSigningService {

/**
* Get a signed ID based on the input blob ID and provided metadata. May not do any checking to ensure that the
* request is authorized to generate a signed ID.
* @param blobId the blob ID to include in the signed ID.
* @param metadata additional parameters to include in the signed ID.
* @return a URL-safe signed ID.
* @throws RestServiceException if the signed ID could not be generated.
*/
String getSignedId(String blobId, Map<String, String> metadata) throws RestServiceException;

/**
* @param id the input ID to check.
* @return {@code true} if the ID is signed. {@code false} otherwise
*/
boolean isIdSigned(String id);

/**
* Verify that the signed ID has not been tampered with and extract the blob ID and additional metadata from the
* signed ID.
* @return a {@link Pair} that contains the blob ID and additional metadata parsed from the signed ID.
* @throws RestServiceException if there are problems verifying or parsing the ID.
*/
Pair<String, Map<String, String>> parseSignedId(String signedId) throws RestServiceException;
}
@@ -0,0 +1,30 @@
/*
* Copyright 2018 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.github.ambry.frontend;

/**
* IdSigningServiceFactory is a factory to generate all the supporting cast required to instantiate a
* {@link IdSigningService}.
* <p/>
* Usually called with the canonical class name and as such might have to support appropriate (multiple) constructors.
*/
public interface IdSigningServiceFactory {

/**
* Returns an instance of the {@link IdSigningService} that the factory generates.
* @return an instance of {@link IdSigningService} generated by this factory.
*/
IdSigningService getIdSigningService();
}
Expand Up @@ -15,6 +15,7 @@

import com.github.ambry.rest.RestRequest;
import com.github.ambry.rest.RestServiceException;
import java.util.Map;


/**
Expand Down
Expand Up @@ -134,6 +134,13 @@ public interface RestRequest extends ReadableStreamChannel {
*/
byte[] getDigest();

/**
* Gets the number of bytes read from the request body at this point in time. After the request has been fully read,
* this can be used to determine the full body size in bytes.
* @return the current number of bytes read from the request body.
*/
long getBytesReceived();

/**
* @return {@code true} if SSL was used for this request (i.e. the request has an associated {@link SSLSession})
*/
Expand Down
43 changes: 31 additions & 12 deletions ambry-api/src/main/java/com.github.ambry/rest/RestUtils.java
Expand Up @@ -173,14 +173,15 @@ public static final class Headers {
* The blob ID requested by the URL.
*/
public static final String BLOB_ID = "x-ambry-blob-id";
/**
* A signed Blob ID with additional metadata.
*/
public static final String SIGNED_BLOB_ID = "x-ambry-signed-blob-id";
/**
* The signed URL header name in the response for signed url requests.
*/
public static final String SIGNED_URL = "x-ambry-signed-url";
/**
* Boolean field set to "true" for getting chunk upload URLs with {@code GET /signedUrl} that will eventually be
* stitched together.
*/
public static final String CHUNK_UPLOAD = "x-ambry-chunk-upload";
/**
* prefix for any header to be set as user metadata for the given blob
*/
Expand All @@ -195,22 +196,29 @@ public static final class Headers {
* Ambry specific keys used internally in a {@link RestRequest}.
*/
public static final class InternalKeys {
private static final String KEY_PREFIX = "ambry-internal-key-";

/**
* The key for the target {@link com.github.ambry.account.Account} indicated by the request.
*/
public final static String TARGET_ACCOUNT_KEY = "ambry-internal-key-target-account";
public static final String TARGET_ACCOUNT_KEY = KEY_PREFIX + "target-account";

/**
* The key for the target {@link com.github.ambry.account.Container} indicated by the request.
*/
public final static String TARGET_CONTAINER_KEY = "ambry-internal-key-target-container";
public static final String TARGET_CONTAINER_KEY = KEY_PREFIX + "target-container";

/**
* The key for the metadata {@code Map<String, String>} to include in a signed ID. This argument should be non-null
* to indicate that a signed ID should be created and returned to the requester on a POST request.
*/
public static final String SIGNED_ID_METADATA_KEY = KEY_PREFIX + "signed-id-metadata";

/**
* To be set if the operation knows the keep-alive behavior it prefers on error. Valid values are boolean.
* Not authoritative, only a hint
*/
public final static String KEEP_ALIVE_ON_ERROR_HINT = "ambry-internal-key-keep-alive-on-error-hint";
public final static String KEEP_ALIVE_ON_ERROR_HINT = KEY_PREFIX + "keep-alive-on-error-hint";
}

/**
Expand Down Expand Up @@ -246,7 +254,7 @@ public static final class MultipartPost {
private static final int CRC_SIZE = 8;
private static final short USER_METADATA_VERSION_V1 = 1;
private static final String BYTE_RANGE_PREFIX = BYTE_RANGE_UNITS + "=";
private static Logger logger = LoggerFactory.getLogger(RestUtils.class);
private static final Logger logger = LoggerFactory.getLogger(RestUtils.class);

/**
* Builds {@link BlobProperties} given the arguments associated with a request.
Expand Down Expand Up @@ -280,6 +288,10 @@ public static BlobProperties buildBlobProperties(Map<String, Object> args) throw
}

/**
* Builds user metadata given the arguments associated with a request.
* <p>
* The following binary format will be used:
* <pre>
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* | | size | total | | | | | | | |
* | version | excluding | no of | key1 size | key1 | value1 size | value 1 | key2 size | ... | Crc |
Expand All @@ -305,11 +317,8 @@ public static BlobProperties buildBlobProperties(Map<String, Object> args) throw
* key2 size - Size of 2nd key
*
* crc - The crc of the user metadata record
* </pre>
*
*/

/**
* Builds user metadata given the arguments associated with a request.
* @param args the arguments associated with the request.
* @return the user metadata extracted from arguments.
* @throws RestServiceException if usermetadata arguments have null values.
Expand Down Expand Up @@ -586,6 +595,16 @@ public static boolean isPrivate(Map<String, Object> args) throws RestServiceExce
return getBooleanHeader(args, Headers.PRIVATE, false);
}

/**
* Determine if {@link Headers#CHUNK_UPLOAD} is set in the request args.
* @param args The request arguments.
* @return {@code true} if {@link Headers#CHUNK_UPLOAD} is set.
* @throws RestServiceException if exception occurs during parsing the arg.
*/
public static boolean isChunkUpload(Map<String, Object> args) throws RestServiceException {
return getBooleanHeader(args, Headers.CHUNK_UPLOAD, false);
}

/**
* Ensures the required headers are present.
* @param restRequest The {@link RestRequest} to ensure header presence. Cannot be {@code null}.
Expand Down
@@ -0,0 +1,42 @@
/*
* Copyright 2018 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.router;

import java.util.concurrent.CompletableFuture;


public class FutureCallback<T> implements Callback<T> {
private final CompletableFuture<T> future = new CompletableFuture<>();

/**
* Complete the {@link CompletableFuture} that is returned by {@link #getFuture}.
* @param result The result of the request. This would be non null when the request executed successfully
* @param exception The exception that was reported on execution of the request
*/
@Override
public void onCompletion(T result, Exception exception) {
if (exception != null) {
future.completeExceptionally(exception);
} else {
future.complete(result);
}
}

/**
* @return the {@link CompletableFuture} that will be completed by {@link #onCompletion}.
*/
public CompletableFuture<T> getFuture() {
return future;
}
}
Expand Up @@ -19,8 +19,45 @@
* Represents any options associated with a putBlob request.
*/
public class PutBlobOptions {
private final boolean chunkUpload;

/**
* @param chunkUpload {@code true} to indicate that the {@code putBlob()} call is for a single data chunk of a
* stitched blob.
*/
PutBlobOptions(boolean chunkUpload) {
this.chunkUpload = chunkUpload;
}

/**
* @return {@code true} to indicate that the {@code putBlob()} call is for a single data chunk of a
* stitched blob.
*/
public boolean isChunkUpload() {
return chunkUpload;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

PutBlobOptions that = (PutBlobOptions) o;

return chunkUpload == that.chunkUpload;
}

@Override
public int hashCode() {
return (chunkUpload ? 1 : 0);
}

@Override
public String toString() {
return "PutBlobOptions{}";
return "PutBlobOptions{" + "chunkUpload=" + chunkUpload + '}';
}
}
Expand Up @@ -19,10 +19,21 @@
* A builder for {@link PutBlobOptions} objects.
*/
public class PutBlobOptionsBuilder {
private boolean chunkUpload = false;

/**
* @param chunkUpload {@code true} to indicate that this is an upload of
* @return
*/
public PutBlobOptionsBuilder chunkUpload(boolean chunkUpload) {
this.chunkUpload = chunkUpload;
return this;
}

/**
* @return the {@link PutBlobOptions} built.
*/
public PutBlobOptions build() {
return new PutBlobOptions();
return new PutBlobOptions(chunkUpload);
}
}

0 comments on commit 5bb777b

Please sign in to comment.