Skip to content

Commit

Permalink
feat: implement GrpcStorageImpl#{get,list,create,delete}Notification (#…
Browse files Browse the repository at this point in the history
…1958)

Rewrite ITNotificationTest to leverage testbench for grpc and to separate different cases into their own individual tests rather than having a single large test.

For now the backend hasn't yet implemented the rpcs for grpc, so we rely on testbench. Once the backend does provide we can remove the CrossRun.Ignore annotation on each of the methods.
  • Loading branch information
BenWhitehead committed Apr 5, 2023
1 parent 5a274d7 commit 830052b
Show file tree
Hide file tree
Showing 11 changed files with 425 additions and 88 deletions.
Expand Up @@ -89,6 +89,11 @@ final class ApiaryConversions {
// when converting from gRPC to apiary or vice-versa we want to preserve this property. Until
// such a time as the apiary model has a project field, we manually apply it with this name.
private static final String PROJECT_ID_FIELD_NAME = "x_project";
// gRPC has a NotificationConfig.name property which contains the bucket the config is associated
// with which that apiary doesn't have yet.
// when converting from gRPC to apiary or vice-versa we want to preserve this property. Until
// such a time as the apiary model has a bucket field, we manually apply it with this name.
private static final String NOTIFICATION_BUCKET_FIELD_NAME = "x_bucket";

private final Codec<Entity, String> entityCodec =
Codec.of(this::entityEncode, this::entityDecode);
Expand Down Expand Up @@ -774,6 +779,7 @@ private com.google.api.services.storage.model.Notification notificationEncode(
to.setEtag(from.getEtag());
to.setSelfLink(from.getSelfLink());
to.setTopic(from.getTopic());
ifNonNull(from.getBucket(), b -> to.set(NOTIFICATION_BUCKET_FIELD_NAME, b));
ifNonNull(from.getNotificationId(), to::setId);
ifNonNull(from.getCustomAttributes(), to::setCustomAttributes);
ifNonNull(from.getObjectNamePrefix(), to::setObjectNamePrefix);
Expand All @@ -799,6 +805,7 @@ private com.google.api.services.storage.model.Notification notificationEncode(
private NotificationInfo notificationDecode(
com.google.api.services.storage.model.Notification from) {
NotificationInfo.Builder builder = new NotificationInfo.BuilderImpl(from.getTopic());
ifNonNull(from.get(NOTIFICATION_BUCKET_FIELD_NAME), String.class::cast, builder::setBucket);
ifNonNull(from.getId(), builder::setNotificationId);
ifNonNull(from.getEtag(), builder::setEtag);
ifNonNull(from.getCustomAttributes(), builder::setCustomAttributes);
Expand Down
Expand Up @@ -20,6 +20,7 @@
import static com.google.cloud.storage.Utils.ifNonNull;
import static com.google.cloud.storage.Utils.lift;
import static com.google.cloud.storage.Utils.projectNameCodec;
import static com.google.cloud.storage.Utils.topicNameCodec;

import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.Binding;
Expand All @@ -35,6 +36,8 @@
import com.google.cloud.storage.BucketInfo.PublicAccessPrevention;
import com.google.cloud.storage.Conversions.Codec;
import com.google.cloud.storage.HmacKey.HmacKeyState;
import com.google.cloud.storage.NotificationInfo.EventType;
import com.google.cloud.storage.NotificationInfo.PayloadFormat;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand All @@ -48,6 +51,8 @@
import com.google.storage.v2.BucketAccessControl;
import com.google.storage.v2.CryptoKeyName;
import com.google.storage.v2.HmacKeyMetadata;
import com.google.storage.v2.NotificationConfig;
import com.google.storage.v2.NotificationConfigName;
import com.google.storage.v2.Object;
import com.google.storage.v2.ObjectAccessControl;
import com.google.storage.v2.ObjectChecksums;
Expand Down Expand Up @@ -918,12 +923,54 @@ private BlobInfo blobInfoDecode(Object from) {
return toBuilder.build();
}

private com.google.storage.v2.NotificationConfig notificationEncode(NotificationInfo from) {
return todo();
private NotificationConfig notificationEncode(NotificationInfo from) {
NotificationConfig.Builder to = NotificationConfig.newBuilder();
String id = from.getNotificationId();
if (id != null) {
if (NotificationConfigName.isParsableFrom(id)) {
ifNonNull(id, to::setName);
} else {
NotificationConfigName name = NotificationConfigName.of("_", from.getBucket(), id);
to.setName(name.toString());
}
}
ifNonNull(from.getTopic(), topicNameCodec::encode, to::setTopic);
ifNonNull(from.getEtag(), to::setEtag);
ifNonNull(from.getEventTypes(), toImmutableListOf(EventType::name), to::addAllEventTypes);
ifNonNull(from.getCustomAttributes(), to::putAllCustomAttributes);
ifNonNull(from.getObjectNamePrefix(), to::setObjectNamePrefix);
ifNonNull(from.getPayloadFormat(), PayloadFormat::name, to::setPayloadFormat);
return to.build();
}

private NotificationInfo notificationDecode(com.google.storage.v2.NotificationConfig from) {
return todo();
private NotificationInfo notificationDecode(NotificationConfig from) {
NotificationInfo.Builder to =
NotificationInfo.newBuilder(topicNameCodec.decode(from.getTopic()));
if (!from.getName().isEmpty()) {
NotificationConfigName parse = NotificationConfigName.parse(from.getName());
// the case where parse could return null is already guarded by the preceding isEmpty check
//noinspection DataFlowIssue
to.setNotificationId(parse.getNotificationConfig());
to.setBucket(parse.getBucket());
}
if (!from.getEtag().isEmpty()) {
to.setEtag(from.getEtag());
}
if (!from.getEventTypesList().isEmpty()) {
EventType[] eventTypes =
from.getEventTypesList().stream().map(EventType::valueOf).toArray(EventType[]::new);
to.setEventTypes(eventTypes);
}
if (!from.getCustomAttributesMap().isEmpty()) {
to.setCustomAttributes(from.getCustomAttributesMap());
}
if (!from.getObjectNamePrefix().isEmpty()) {
to.setObjectNamePrefix(from.getObjectNamePrefix());
}
if (!from.getPayloadFormat().isEmpty()) {
to.setPayloadFormat(PayloadFormat.valueOf(from.getPayloadFormat()));
}
return to.build();
}

private com.google.iam.v1.Policy policyEncode(Policy from) {
Expand Down
Expand Up @@ -30,7 +30,6 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.paging.AbstractPage;
import com.google.api.gax.paging.Page;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
Expand All @@ -39,7 +38,6 @@
import com.google.api.gax.rpc.NotFoundException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.rpc.UnimplementedException;
import com.google.cloud.BaseService;
import com.google.cloud.Policy;
import com.google.cloud.WriteChannel;
Expand Down Expand Up @@ -84,32 +82,39 @@
import com.google.storage.v2.ComposeObjectRequest.SourceObject;
import com.google.storage.v2.CreateBucketRequest;
import com.google.storage.v2.CreateHmacKeyRequest;
import com.google.storage.v2.CreateNotificationConfigRequest;
import com.google.storage.v2.DeleteBucketRequest;
import com.google.storage.v2.DeleteHmacKeyRequest;
import com.google.storage.v2.DeleteNotificationConfigRequest;
import com.google.storage.v2.DeleteObjectRequest;
import com.google.storage.v2.GetBucketRequest;
import com.google.storage.v2.GetHmacKeyRequest;
import com.google.storage.v2.GetNotificationConfigRequest;
import com.google.storage.v2.GetObjectRequest;
import com.google.storage.v2.GetServiceAccountRequest;
import com.google.storage.v2.ListBucketsRequest;
import com.google.storage.v2.ListHmacKeysRequest;
import com.google.storage.v2.ListNotificationConfigsRequest;
import com.google.storage.v2.ListNotificationConfigsResponse;
import com.google.storage.v2.ListObjectsRequest;
import com.google.storage.v2.ListObjectsResponse;
import com.google.storage.v2.LockBucketRetentionPolicyRequest;
import com.google.storage.v2.NotificationConfig;
import com.google.storage.v2.NotificationConfigName;
import com.google.storage.v2.Object;
import com.google.storage.v2.ObjectAccessControl;
import com.google.storage.v2.ObjectChecksums;
import com.google.storage.v2.ReadObjectRequest;
import com.google.storage.v2.RewriteObjectRequest;
import com.google.storage.v2.RewriteResponse;
import com.google.storage.v2.StorageClient;
import com.google.storage.v2.StorageClient.ListNotificationConfigsPage;
import com.google.storage.v2.UpdateBucketRequest;
import com.google.storage.v2.UpdateHmacKeyRequest;
import com.google.storage.v2.UpdateObjectRequest;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
import com.google.storage.v2.WriteObjectSpec;
import io.grpc.Status.Code;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -1404,23 +1409,92 @@ public ServiceAccount getServiceAccount(String projectId) {

@Override
public Notification createNotification(String bucket, NotificationInfo notificationInfo) {
return throwNotYetImplemented(
fmtMethodName("createNotification", String.class, NotificationInfo.class));
NotificationConfig encode = codecs.notificationInfo().encode(notificationInfo);
CreateNotificationConfigRequest req =
CreateNotificationConfigRequest.newBuilder()
.setParent(bucketNameCodec.encode(bucket))
.setNotificationConfig(encode)
.build();
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> storageClient.createNotificationConfigCallable().call(req),
syntaxDecoders.notificationConfig);
}

@Override
public Notification getNotification(String bucket, String notificationId) {
return throwNotYetImplemented(fmtMethodName("getNotification", String.class, String.class));
String name;
if (NotificationConfigName.isParsableFrom(notificationId)) {
name = notificationId;
} else {
NotificationConfigName configName = NotificationConfigName.of("_", bucket, notificationId);
name = configName.toString();
}
GetNotificationConfigRequest req =
GetNotificationConfigRequest.newBuilder().setName(name).build();
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> {
try {
return storageClient.getNotificationConfigCallable().call(req);
} catch (NotFoundException e) {
return null;
}
},
syntaxDecoders.notificationConfig);
}

@Override
public List<Notification> listNotifications(String bucket) {
return throwNotYetImplemented(fmtMethodName("listNotifications", String.class));
ListNotificationConfigsRequest req =
ListNotificationConfigsRequest.newBuilder()
.setParent(bucketNameCodec.encode(bucket))
.build();
ResultRetryAlgorithm<?> algorithm = retryAlgorithmManager.getFor(req);
return Retrying.run(
getOptions(),
algorithm,
() -> storageClient.listNotificationConfigsPagedCallable().call(req),
resp -> {
TransformingPageDecorator<
ListNotificationConfigsRequest,
ListNotificationConfigsResponse,
NotificationConfig,
ListNotificationConfigsPage,
Notification>
page =
new TransformingPageDecorator<>(
resp.getPage(), syntaxDecoders.notificationConfig, getOptions(), algorithm);
return ImmutableList.copyOf(page.iterateAll());
});
}

@Override
public boolean deleteNotification(String bucket, String notificationId) {
return throwNotYetImplemented(fmtMethodName("deleteNotification", String.class, String.class));
String name;
if (NotificationConfigName.isParsableFrom(notificationId)) {
name = notificationId;
} else {
NotificationConfigName configName = NotificationConfigName.of("_", bucket, notificationId);
name = configName.toString();
}
DeleteNotificationConfigRequest req =
DeleteNotificationConfigRequest.newBuilder().setName(name).build();
return Boolean.TRUE.equals(
Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> {
try {
storageClient.deleteNotificationConfigCallable().call(req);
return true;
} catch (NotFoundException e) {
return false;
}
},
Decoder.identity()));
}

@Override
Expand Down Expand Up @@ -1448,6 +1522,8 @@ private final class SyntaxDecoders {
o -> codecs.blobInfo().decode(o).asBlob(GrpcStorageImpl.this);
final Decoder<com.google.storage.v2.Bucket, Bucket> bucket =
b -> codecs.bucketInfo().decode(b).asBucket(GrpcStorageImpl.this);
final Decoder<NotificationConfig, Notification> notificationConfig =
n -> codecs.notificationInfo().decode(n).asNotification(GrpcStorageImpl.this);
}

/**
Expand Down Expand Up @@ -1668,14 +1744,6 @@ static <T> T throwHttpJsonOnly(Class<?> clazz, String methodName) {
throw new UnsupportedOperationException(message);
}

static <T> T throwNotYetImplemented(String methodName) {
String message =
String.format(
"%s#%s is not yet implemented for GRPC transport. Please use StorageOptions.http() to construct a compatible instance in the interim.",
Storage.class.getName(), methodName);
throw new UnimplementedException(message, null, GrpcStatusCode.of(Code.UNIMPLEMENTED), false);
}

private static String fmtMethodName(String name, Class<?>... args) {
return name
+ "("
Expand Down
Expand Up @@ -88,6 +88,12 @@ public Builder setCustomAttributes(Map<String, String> customAttributes) {
return this;
}

@Override
Builder setBucket(String bucket) {
infoBuilder.setBucket(bucket);
return this;
}

@Override
public Notification build() {
return new Notification(storage, infoBuilder);
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.core.InternalApi;
import com.google.api.pathtemplate.PathTemplate;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
Expand All @@ -34,11 +35,13 @@ public class NotificationInfo implements Serializable {
private static final PathTemplate PATH_TEMPLATE =
PathTemplate.createWithoutUrlEncoding("projects/{project}/topics/{topic}");

// TODO: Change to StringEnum in next major version
public enum PayloadFormat {
JSON_API_V1,
NONE
}

// TODO: Change to StringEnum in next major version
public enum EventType {
OBJECT_FINALIZE,
OBJECT_METADATA_UPDATE,
Expand All @@ -54,6 +57,7 @@ public enum EventType {
private final String objectNamePrefix;
private final String etag;
private final String selfLink;
private final String bucket;

/** Builder for {@code NotificationInfo}. */
public abstract static class Builder {
Expand All @@ -75,6 +79,8 @@ public abstract static class Builder {

public abstract Builder setCustomAttributes(Map<String, String> customAttributes);

abstract Builder setBucket(String bucket);

/** Creates a {@code NotificationInfo} object. */
public abstract NotificationInfo build();
}
Expand All @@ -90,6 +96,7 @@ public static class BuilderImpl extends Builder {
private String objectNamePrefix;
private String etag;
private String selfLink;
private String bucket;

BuilderImpl(String topic) {
this.topic = topic;
Expand All @@ -104,6 +111,7 @@ public static class BuilderImpl extends Builder {
customAttributes = notificationInfo.customAttributes;
payloadFormat = notificationInfo.payloadFormat;
objectNamePrefix = notificationInfo.objectNamePrefix;
bucket = notificationInfo.bucket;
}

@Override
Expand Down Expand Up @@ -156,6 +164,12 @@ public Builder setCustomAttributes(Map<String, String> customAttributes) {
return this;
}

@Override
Builder setBucket(String bucket) {
this.bucket = bucket;
return this;
}

public NotificationInfo build() {
checkNotNull(topic);
checkTopicFormat(topic);
Expand All @@ -172,6 +186,7 @@ public NotificationInfo build() {
customAttributes = builder.customAttributes;
payloadFormat = builder.payloadFormat;
objectNamePrefix = builder.objectNamePrefix;
bucket = builder.bucket;
}

/** Returns the service-generated id for the notification. */
Expand Down Expand Up @@ -225,6 +240,15 @@ public Map<String, String> getCustomAttributes() {
return customAttributes;
}

/**
* gRPC has the bucket name encoded in the notification name, use this internal property to track
* it.
*/
@InternalApi
String getBucket() {
return bucket;
}

@Override
public int hashCode() {
return Objects.hash(
Expand Down

0 comments on commit 830052b

Please sign in to comment.